diff options
-rw-r--r-- | jstests/aggregation/sources/changeNotification/change_notification.js | 216 | ||||
-rw-r--r-- | jstests/aggregation/sources/changeNotification/lookup_post_image.js | 79 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 19 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification.cpp | 76 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification.h | 3 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification_test.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token.h | 79 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_check_resume_token_test.cpp | 191 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_sources.idl | 103 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_test.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/pipeline/resume_token.h | 75 | ||||
-rw-r--r-- | src/mongo/db/pipeline/value.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/pipeline/value.h | 5 |
16 files changed, 950 insertions, 125 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification.js b/jstests/aggregation/sources/changeNotification/change_notification.js index 98a7c0db6a0..386d679b14d 100644 --- a/jstests/aggregation/sources/changeNotification/change_notification.js +++ b/jstests/aggregation/sources/changeNotification/change_notification.js @@ -3,41 +3,39 @@ "use strict"; const oplogProjection = {$project: {"_id.ts": 0}}; + function getCollectionNameFromFullNamespace(ns) { + return ns.split(/\.(.+)/)[1]; + } - /** - * Tests the output of a $changeNotification stage, asserting only that the result at the end of - * the change stream on the collection 'collection' (the newest matching entry in the oplog) is - * equal to 'expectedResult'. - * - * Note this change assumes that the set of changes will fit within one batch. - */ - function checkLatestChange(expectedResult, collection) { - const cmdResponse = assert.commandWorked(db.runCommand({ - aggregate: collection.getName(), - pipeline: [ - {$changeNotification: {}}, - // Strip the oplog fields we aren't testing. - {$project: {"_id.ts": 0}} - ], - cursor: {} - })); - const firstBatch = cmdResponse.cursor.firstBatch; - assert.neq(firstBatch.length, 0); - assert.docEq(firstBatch[firstBatch.length - 1], expectedResult); + // Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges + // with the pipeline, then insert the changes, then run assertNextBatchMatches with the result + // of startWatchingChanges and the expected set of results. + function startWatchingChanges(pipeline, collection) { + // Strip the oplog fields we aren't testing. + pipeline.push(oplogProjection); + // Waiting for replication assures no previous operations will be included. + replTest.awaitReplication(); + let res = assert.commandWorked( + db.runCommand({aggregate: collection.getName(), "pipeline": pipeline, cursor: {}})); + assert.neq(res.cursor.id, 0); + return res.cursor; } - /** - * Tests that there are no changes in the 'collection'. - */ - function assertNoLatestChange(collection) { - const cmdResponse = assert.commandWorked(db.runCommand({ - aggregate: collection.getName(), - pipeline: [ - {$changeNotification: {}}, - ], - cursor: {} + function assertNextBatchMatches({cursor, expectedBatch}) { + replTest.awaitReplication(); + if (expectedBatch.length == 0) + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); + let res = assert.commandWorked(db.runCommand({ + getMore: cursor.id, + collection: getCollectionNameFromFullNamespace(cursor.ns), + maxTimeMS: 5 * 60 * 1000, + batchSize: (expectedBatch.length + 1) })); - assert.eq(cmdResponse.cursor.firstBatch.length, 0); + if (expectedBatch.length == 0) + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); + assert.docEq(res.cursor.nextBatch, expectedBatch); } let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1}); @@ -49,6 +47,7 @@ db.getMongo().forceReadMode('commands'); jsTestLog("Testing single insert"); + let cursor = startWatchingChanges([{$changeNotification: {}}], db.t1); assert.writeOK(db.t1.insert({_id: 0, a: 1})); let expected = { _id: { @@ -60,9 +59,10 @@ ns: {coll: "t1", db: "test"}, operationType: "insert", }; - checkLatestChange(expected, db.t1); + assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing second insert"); + cursor = startWatchingChanges([{$changeNotification: {}}], db.t1); assert.writeOK(db.t1.insert({_id: 1, a: 2})); expected = { _id: { @@ -74,9 +74,10 @@ ns: {coll: "t1", db: "test"}, operationType: "insert", }; - checkLatestChange(expected, db.t1); + assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing update"); + cursor = startWatchingChanges([{$changeNotification: {}}], db.t1); assert.writeOK(db.t1.update({_id: 0}, {a: 3})); expected = { _id: {_id: 0, ns: "test.t1"}, @@ -85,9 +86,10 @@ ns: {coll: "t1", db: "test"}, operationType: "replace", }; - checkLatestChange(expected, db.t1); + assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing update of another field"); + cursor = startWatchingChanges([{$changeNotification: {}}], db.t1); assert.writeOK(db.t1.update({_id: 0}, {b: 3})); expected = { _id: {_id: 0, ns: "test.t1"}, @@ -96,9 +98,10 @@ ns: {coll: "t1", db: "test"}, operationType: "replace", }; - checkLatestChange(expected, db.t1); + assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing upsert"); + cursor = startWatchingChanges([{$changeNotification: {}}], db.t1); assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true})); expected = { _id: { @@ -110,10 +113,11 @@ ns: {coll: "t1", db: "test"}, operationType: "insert", }; - checkLatestChange(expected, db.t1); + assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing partial update with $inc"); assert.writeOK(db.t1.insert({_id: 3, a: 5, b: 1})); + cursor = startWatchingChanges([{$changeNotification: {}}], db.t1); assert.writeOK(db.t1.update({_id: 3}, {$inc: {b: 2}})); expected = { _id: {_id: 3, ns: "test.t1"}, @@ -123,9 +127,10 @@ operationType: "update", updateDescription: {removedFields: [], updatedFields: {b: 3}}, }; - checkLatestChange(expected, db.t1); + assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing delete"); + cursor = startWatchingChanges([{$changeNotification: {}}], db.t1); assert.writeOK(db.t1.remove({_id: 1})); expected = { _id: {_id: 1, ns: "test.t1"}, @@ -134,11 +139,13 @@ ns: {coll: "t1", db: "test"}, operationType: "delete", }; - checkLatestChange(expected, db.t1); + assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing intervening write on another collection"); + cursor = startWatchingChanges([{$changeNotification: {}}], db.t1); + let t2cursor = startWatchingChanges([{$changeNotification: {}}], db.t2); assert.writeOK(db.t2.insert({_id: 100, c: 1})); - checkLatestChange(expected, db.t1); + assertNextBatchMatches({cursor: cursor, expectedBatch: []}); expected = { _id: { _id: 100, @@ -149,21 +156,24 @@ ns: {coll: "t2", db: "test"}, operationType: "insert", }; - checkLatestChange(expected, db.t2); + assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]}); jsTestLog("Testing rename"); + t2cursor = startWatchingChanges([{$changeNotification: {}}], db.t2); assert.writeOK(db.t2.renameCollection("t3")); expected = {_id: {ns: "test.$cmd"}, operationType: "invalidate", fullDocument: null}; - checkLatestChange(expected, db.t2); + assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]}); jsTestLog("Testing insert that looks like rename"); + const dne1cursor = startWatchingChanges([{$changeNotification: {}}], db.dne1); + const dne2cursor = startWatchingChanges([{$changeNotification: {}}], db.dne2); assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); - assertNoLatestChange(db.dne1); - assertNoLatestChange(db.dne2); + assertNextBatchMatches({cursor: dne1cursor, expectedBatch: []}); + assertNextBatchMatches({cursor: dne2cursor, expectedBatch: []}); // Now make sure the cursor behaves like a tailable awaitData cursor. jsTestLog("Testing tailability"); - let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]); + const tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]); assert(!tailableCursor.hasNext()); assert.writeOK(db.tailable1.insert({_id: 101, a: 1})); assert(tailableCursor.hasNext()); @@ -192,9 +202,11 @@ // Initial batch size should be zero as there should be no data. assert.eq(aggcursor.firstBatch.length, 0); - // No data, so should return no results, but cursor should remain valid. + // No data, so should return no results, but cursor should remain valid. Note we are + // specifically testing awaitdata behavior here, so we cannot use the failpoint to skip the + // wait. res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50})); + db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 1000})); aggcursor = res.cursor; assert.neq(aggcursor.id, 0); assert.eq(aggcursor.nextBatch.length, 0); @@ -294,11 +306,117 @@ jsTestLog("Ensuring attempt to read with legacy operations fails."); db.getMongo().forceReadMode('legacy'); - tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection], - {cursor: {batchSize: 0}}); + const legacyCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection], + {cursor: {batchSize: 0}}); assert.throws(function() { - tailableCursor.next(); + legacyCursor.next(); }, [], "Legacy getMore expected to fail on changeNotification cursor."); + /** + * Gets one document from the cursor using getMore with awaitData disabled. Asserts if no + * document is present. + */ + function getOneDoc(cursor) { + replTest.awaitReplication(); + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); + let res = assert.commandWorked(db.runCommand({ + getMore: cursor.id, + collection: getCollectionNameFromFullNamespace(cursor.ns), + batchSize: 1 + })); + assert.eq(res.cursor.nextBatch.length, 1); + assert.commandWorked( + db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); + return res.cursor.nextBatch[0]; + } + + /** + * Attempts to get a document from the cursor with awaitData disabled, and asserts if a document + * is present. + */ + function assertNextBatchIsEmpty(cursor) { + replTest.awaitReplication(); + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); + let res = assert.commandWorked(db.runCommand({ + getMore: cursor.id, + collection: getCollectionNameFromFullNamespace(cursor.ns), + batchSize: 1 + })); + assert.eq(res.cursor.nextBatch.length, 0); + assert.commandWorked( + db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); + } + + jsTestLog("Testing resumability"); + assert.commandWorked(db.createCollection("resume1")); + + // Note we do not project away 'id.ts' as it is part of the resume token. + res = assert.commandWorked( + db.runCommand({aggregate: "resume1", pipeline: [{$changeNotification: {}}], cursor: {}})); + let resumeCursor = res.cursor; + assert.neq(resumeCursor.id, 0); + assert.eq(resumeCursor.firstBatch.length, 0); + + // Insert a document and save the resulting change notification. + assert.writeOK(db.resume1.insert({_id: 1})); + const firstInsertChangeDoc = getOneDoc(resumeCursor); + assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); + + jsTestLog("Testing resume after one document."); + res = assert.commandWorked(db.runCommand({ + aggregate: "resume1", + pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}], + cursor: {batchSize: 0} + })); + resumeCursor = res.cursor; + assertNextBatchIsEmpty(resumeCursor); + + jsTestLog("Inserting additional documents."); + assert.writeOK(db.resume1.insert({_id: 2})); + const secondInsertChangeDoc = getOneDoc(resumeCursor); + assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2}); + assert.writeOK(db.resume1.insert({_id: 3})); + const thirdInsertChangeDoc = getOneDoc(resumeCursor); + assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3}); + assertNextBatchIsEmpty(resumeCursor); + + jsTestLog("Testing resume after first document of three."); + res = assert.commandWorked(db.runCommand({ + aggregate: "resume1", + pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}], + cursor: {batchSize: 0} + })); + resumeCursor = res.cursor; + assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc); + assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc); + assertNextBatchIsEmpty(resumeCursor); + + jsTestLog("Testing resume after second document of three."); + res = assert.commandWorked(db.runCommand({ + aggregate: "resume1", + pipeline: [{$changeNotification: {resumeAfter: secondInsertChangeDoc._id}}], + cursor: {batchSize: 0} + })); + resumeCursor = res.cursor; + assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc); + assertNextBatchIsEmpty(resumeCursor); + + jsTestLog("Testing that resume is possible after the collection is dropped."); + assert(db.resume1.drop()); + const invalidateDoc = getOneDoc(resumeCursor); + assert.eq(invalidateDoc.operationType, "invalidate"); + res = assert.commandWorked(db.runCommand({ + aggregate: "resume1", + pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}], + cursor: {batchSize: 0} + })); + resumeCursor = res.cursor; + assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc); + assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc); + assert.docEq(getOneDoc(resumeCursor), invalidateDoc); + assertNextBatchIsEmpty(resumeCursor); + replTest.stopSet(); }()); diff --git a/jstests/aggregation/sources/changeNotification/lookup_post_image.js b/jstests/aggregation/sources/changeNotification/lookup_post_image.js index 99f589f6b98..cd97452e31f 100644 --- a/jstests/aggregation/sources/changeNotification/lookup_post_image.js +++ b/jstests/aggregation/sources/changeNotification/lookup_post_image.js @@ -23,13 +23,42 @@ return cmdResponse.cursor.firstBatch[cmdResponse.cursor.firstBatch.length - 1]; } - jsTestLog("Testing change streams without 'fullDocument' specified"); + function getCollectionNameFromFullNamespace(ns) { + return ns.split(/\.(.+)/)[1]; + } + + /** + * Gets one document from the cursor using getMore with awaitData disabled. Asserts if no + * document is present. + */ + function getOneDoc(cursor) { + replTest.awaitReplication(); + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); + let res = assert.commandWorked(db.runCommand({ + getMore: cursor.id, + collection: getCollectionNameFromFullNamespace(cursor.ns), + batchSize: 1 + })); + assert.eq(res.cursor.nextBatch.length, 1); + assert.commandWorked( + db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); + return res.cursor.nextBatch[0]; + } + + // Dummy document to give a resumeAfter point. + db.createCollection(coll.getName()); + let res = assert.commandWorked(db.runCommand( + {aggregate: coll.getName(), pipeline: [{$changeNotification: {}}], cursor: {}})); + assert.writeOK(coll.insert({_id: "dummy"})); + const firstChange = getOneDoc(res.cursor); + jsTestLog("Testing change streams without 'fullDocument' specified"); // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for an // insert. assert.writeOK(coll.insert({_id: "fullDocument not specified"})); - let latestChange = - getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]}); + let latestChange = getLastResultFromFirstBatch( + {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]}); assert.eq(latestChange.operationType, "insert"); assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified"}); @@ -37,16 +66,16 @@ // replacement-style update. assert.writeOK(coll.update({_id: "fullDocument not specified"}, {_id: "fullDocument not specified", replaced: true})); - latestChange = - getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]}); + latestChange = getLastResultFromFirstBatch( + {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]}); assert.eq(latestChange.operationType, "replace"); assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified", replaced: true}); // Test that not specifying 'fullDocument' does not include a 'fullDocument' in the result for // a non-replacement update. assert.writeOK(coll.update({_id: "fullDocument not specified"}, {$set: {updated: true}})); - latestChange = - getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]}); + latestChange = getLastResultFromFirstBatch( + {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]}); assert.eq(latestChange.operationType, "update"); assert.eq(null, latestChange.fullDocument); @@ -55,8 +84,10 @@ // Test that specifying 'fullDocument' as 'none' does include a 'fullDocument' in the result for // an insert. assert.writeOK(coll.insert({_id: "fullDocument is none"})); - latestChange = getLastResultFromFirstBatch( - {collection: coll, pipeline: [{$changeNotification: {fullDocument: "none"}}]}); + latestChange = getLastResultFromFirstBatch({ + collection: coll, + pipeline: [{$changeNotification: {fullDocument: "none", resumeAfter: firstChange._id}}] + }); assert.eq(latestChange.operationType, "insert"); assert.eq(latestChange.fullDocument, {_id: "fullDocument is none"}); @@ -64,16 +95,16 @@ // a replacement-style update. assert.writeOK( coll.update({_id: "fullDocument is none"}, {_id: "fullDocument is none", replaced: true})); - latestChange = - getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]}); + latestChange = getLastResultFromFirstBatch( + {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]}); assert.eq(latestChange.operationType, "replace"); assert.eq(latestChange.fullDocument, {_id: "fullDocument is none", replaced: true}); // Test that specifying 'fullDocument' as 'none' does not include a 'fullDocument' in the result // for a non-replacement update. assert.writeOK(coll.update({_id: "fullDocument is none"}, {$set: {updated: true}})); - latestChange = - getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]}); + latestChange = getLastResultFromFirstBatch( + {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]}); assert.eq(latestChange.operationType, "update"); assert.eq(null, latestChange.fullDocument); @@ -82,8 +113,10 @@ // Test that specifying 'fullDocument' as 'lookup' does include a 'fullDocument' in the result // for an insert. assert.writeOK(coll.insert({_id: "fullDocument is lookup"})); - latestChange = getLastResultFromFirstBatch( - {collection: coll, pipeline: [{$changeNotification: {fullDocument: "lookup"}}]}); + latestChange = getLastResultFromFirstBatch({ + collection: coll, + pipeline: [{$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}}] + }); assert.eq(latestChange.operationType, "insert"); assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup"}); @@ -91,16 +124,20 @@ // for a replacement-style update. assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {_id: "fullDocument is lookup", replaced: true})); - latestChange = getLastResultFromFirstBatch( - {collection: coll, pipeline: [{$changeNotification: {fullDocument: "lookup"}}]}); + latestChange = getLastResultFromFirstBatch({ + collection: coll, + pipeline: [{$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}}] + }); assert.eq(latestChange.operationType, "replace"); assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup", replaced: true}); // Test that specifying 'fullDocument' as 'lookup' does include a 'fullDocument' in the result // for a non-replacement update. assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updated: true}})); - latestChange = getLastResultFromFirstBatch( - {collection: coll, pipeline: [{$changeNotification: {fullDocument: "lookup"}}]}); + latestChange = getLastResultFromFirstBatch({ + collection: coll, + pipeline: [{$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}}] + }); assert.eq(latestChange.operationType, "update"); assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup", replaced: true, updated: true}); @@ -111,7 +148,7 @@ latestChange = getLastResultFromFirstBatch({ collection: coll, pipeline: [ - {$changeNotification: {fullDocument: "lookup"}}, + {$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}}, {$match: {operationType: "update"}} ] }); @@ -124,7 +161,7 @@ latestChange = getLastResultFromFirstBatch({ collection: coll, pipeline: [ - {$changeNotification: {fullDocument: "lookup"}}, + {$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}}, {$match: {operationType: "update"}} ] }); diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 58a1622c49b..277d8f2e660 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -66,6 +66,9 @@ namespace mongo { namespace { MONGO_FP_DECLARE(rsStopGetMoreCmd); +// Failpoint for making getMore not wait for an awaitdata cursor. Allows us to avoid waiting during +// tests. +MONGO_FP_DECLARE(disableAwaitDataForGetMoreCmd); } // namespace /** @@ -281,6 +284,8 @@ public: const bool hasOwnMaxTime = opCtx->hasDeadline(); + const bool disableAwaitDataFailpointActive = + MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd); // We assume that cursors created through a DBDirectClient are always used from their // original OperationContext, so we do not need to move time to and from the cursor. if (!hasOwnMaxTime && !opCtx->getClient()->isInDirectClient()) { @@ -288,7 +293,7 @@ public: // awaitData, then we supply a default time of one second. Otherwise we roll over // any leftover time from the maxTimeMS of the operation that spawned this cursor, // applying it to this getMore. - if (isCursorAwaitData(cursor)) { + if (isCursorAwaitData(cursor) && !disableAwaitDataFailpointActive) { opCtx->setDeadlineAfterNowBy(Seconds{1}); } else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) { opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros()); @@ -329,7 +334,7 @@ public: Explain::getSummaryStats(*exec, &preExecutionStats); // Mark this as an AwaitData operation if appropriate. - if (isCursorAwaitData(cursor)) { + if (isCursorAwaitData(cursor) && !disableAwaitDataFailpointActive) { if (request.lastKnownCommittedOpTime) clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get(); shouldWaitForInserts(opCtx) = true; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 093cbb5a5c9..e548ff4f464 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -130,6 +130,7 @@ env.CppUnitTest( 'document_source_bucket_auto_test.cpp', 'document_source_bucket_test.cpp', 'document_source_change_notification_test.cpp', + 'document_source_check_resume_token_test.cpp', 'document_source_count_test.cpp', 'document_source_current_op_test.cpp', 'document_source_geo_near_test.cpp', @@ -157,6 +158,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/query/query_test_service_context', '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/s/is_mongos', '$BUILD_DIR/mongo/util/clock_source_mock', @@ -264,6 +266,7 @@ docSourceEnv.Library( '$BUILD_DIR/mongo/db/matcher/expressions', '$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source', '$BUILD_DIR/mongo/db/repl/oplog_entry', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/top', '$BUILD_DIR/mongo/db/storage/encryption_hooks', @@ -272,6 +275,7 @@ docSourceEnv.Library( '$BUILD_DIR/third_party/shim_snappy', 'accumulator', 'dependencies', + 'document_sources_idl', 'document_value', 'expression', 'granularity_rounder', @@ -312,6 +316,7 @@ env.Library( target='document_source_lookup', source=[ 'document_source_change_notification.cpp', + 'document_source_check_resume_token.cpp', 'document_source_graph_lookup.cpp', 'document_source_lookup.cpp', 'document_source_lookup_change_post_image.cpp', @@ -391,6 +396,7 @@ env.CppUnitTest( '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', '$BUILD_DIR/mongo/db/query/collation/collator_interface_mock', '$BUILD_DIR/mongo/db/query/query_test_service_context', + '$BUILD_DIR/mongo/db/repl/replmocks', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/s/is_mongos', @@ -491,3 +497,16 @@ env.Library( '$BUILD_DIR/mongo/db/stats/serveronly', ], ) + +env.Library( + target='document_sources_idl', + source=[ + env.Idlc('document_sources.idl')[0], + 'resume_token.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/idl/idl_parser', + 'document_value', + ], +) diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index d173d1183d5..c463c3574c9 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -31,14 +31,17 @@ #include "mongo/db/pipeline/document_source_change_notification.h" #include "mongo/bson/simple_bsonelement_comparator.h" +#include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/pipeline/resume_token.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/oplog_entry_gen.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/util/log.h" namespace mongo { @@ -115,7 +118,9 @@ private: }; } // namespace -BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) { +BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss, + Timestamp startFrom, + bool isResume) { auto target = nss.ns(); // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. @@ -140,7 +145,9 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString auto opMatch = BSON("ns" << target); // Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops. - return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); + // Include the resume token if resuming, so we can verify it was still present in the oplog. + return BSON("ts" << (isResume ? GTE : GT) << startFrom << "$or" + << BSON_ARRAY(opMatch << commandMatch)); } list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson( @@ -153,44 +160,39 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFrom "Only default collation is allowed when using a $changeNotification stage.", !expCtx->getCollator()); - uassert(40573, - str::stream() << "the $changeNotification stage must be specified as an object, got " - << typeName(elem.type()), - elem.type() == BSONType::Object); - - bool shouldLookupPostImage = false; - for (auto&& option : elem.embeddedObject()) { - auto optionName = option.fieldNameStringData(); - if (optionName == "fullDocument"_sd) { - uassert(40574, - str::stream() << "the 'fullDocument' option to the $changeNotification stage " - "must be a string, got " - << typeName(option.type()), - option.type() == BSONType::String); - auto fullDocOption = option.valueStringData(); - uassert(40575, - str::stream() << "unrecognized value for the 'fullDocument' option to the " - "$changeNotification stage. Expected \"none\" or " - "\"fullDocument\", got \"" - << option.String() - << "\"", - fullDocOption == "lookup"_sd || fullDocOption == "none"_sd); - shouldLookupPostImage = (fullDocOption == "lookup"_sd); - } else if (optionName == "resumeAfter"_sd) { - uasserted( - 40576, - "the 'resumeAfter' option to the $changeNotification stage is not yet supported"); - } else { - uasserted(40577, - str::stream() << "unrecognized option to $changeNotification stage: \"" - << optionName - << "\""); - } + auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx); + uassert(40573, "The $changeNotification stage is only supported on replica sets", replCoord); + Timestamp startFrom = replCoord->getLastCommittedOpTime().getTimestamp(); + + intrusive_ptr<DocumentSourceCheckResumeToken> resumeStage = nullptr; + auto spec = DocumentSourceChangeNotificationSpec::parse( + IDLParserErrorContext("$changeNotification"), elem.embeddedObject()); + if (auto resumeAfter = spec.getResumeAfter()) { + ResumeToken token = resumeAfter.get(); + startFrom = token.getTimestamp(); + DocumentSourceCheckResumeTokenSpec spec; + spec.setResumeToken(std::move(token)); + resumeStage = DocumentSourceCheckResumeToken::create(expCtx, std::move(spec)); } - - auto oplogMatch = DocumentSourceOplogMatch::create(buildMatchFilter(expCtx->ns), expCtx); + const bool changeStreamIsResuming = resumeStage != nullptr; + + auto fullDocOption = spec.getFullDocument(); + uassert(40575, + str::stream() << "unrecognized value for the 'fullDocument' option to the " + "$changeNotification stage. Expected \"none\" or " + "\"lookup\", got \"" + << fullDocOption + << "\"", + fullDocOption == "lookup"_sd || fullDocOption == "none"_sd); + const bool shouldLookupPostImage = (fullDocOption == "lookup"_sd); + + auto oplogMatch = DocumentSourceOplogMatch::create( + buildMatchFilter(expCtx->ns, startFrom, changeStreamIsResuming), expCtx); auto transformation = createTransformationStage(elem.embeddedObject(), expCtx); list<intrusive_ptr<DocumentSource>> stages = {oplogMatch, transformation}; + if (resumeStage) { + stages.push_back(resumeStage); + } if (shouldLookupPostImage) { stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx)); } diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h index ad10a5ad210..036c958888f 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.h +++ b/src/mongo/db/pipeline/document_source_change_notification.h @@ -30,6 +30,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" +#include "mongo/db/pipeline/document_sources_gen.h" namespace mongo { @@ -119,7 +120,7 @@ public: * Produce the BSON object representing the filter for the $match stage to filter oplog entries * to only those relevant for this $changeNotification stage. */ - static BSONObj buildMatchFilter(const NamespaceString& nss); + static BSONObj buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume); /** * Parses a $changeNotification stage from 'elem' and produces the $match and transformation diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index 9c08bd91637..ffc9460c96d 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -44,6 +44,7 @@ #include "mongo/db/pipeline/value.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/unittest/unittest.h" namespace mongo { @@ -65,9 +66,15 @@ static const Timestamp ts(100, 1); static const repl::OpTime optime(ts, 1); static const NamespaceString nss("unittests.change_notification"); +using ChangeNotificationStageTestNoSetup = AggregationContextFixture; + class ChangeNotificationStageTest : public AggregationContextFixture { public: - ChangeNotificationStageTest() : AggregationContextFixture(nss) {} + ChangeNotificationStageTest() : AggregationContextFixture(nss) { + repl::ReplicationCoordinator::set(getExpCtx()->opCtx->getServiceContext(), + stdx::make_unique<repl::ReplicationCoordinatorMock>( + getExpCtx()->opCtx->getServiceContext())); + } void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) { const auto spec = fromjson("{$changeNotification: {}}"); @@ -106,19 +113,7 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedOption) { BSON(DSChangeNotification::kStageName << BSON("unexpected" << 4)).firstElement(), expCtx), UserException, - 40577); -} - -TEST_F(ChangeNotificationStageTest, ShouldRejectResumeAfterOption) { - // TODO SERVER-29131 change this test to accept the option. - auto expCtx = getExpCtx(); - - ASSERT_THROWS_CODE( - DSChangeNotification::createFromBson( - BSON(DSChangeNotification::kStageName << BSON("resumeAfter" << ts)).firstElement(), - expCtx), - UserException, - 40576); + 40415); } TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) { @@ -129,7 +124,7 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) { BSON(DSChangeNotification::kStageName << BSON("fullDocument" << true)).firstElement(), expCtx), UserException, - 40574); + ErrorCodes::TypeMismatch); } TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption) { @@ -144,6 +139,15 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption) 40575); } +TEST_F(ChangeNotificationStageTestNoSetup, FailsWithNoReplicationCoordinator) { + const auto spec = fromjson("{$changeNotification: {}}"); + + ASSERT_THROWS_CODE( + DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()), + UserException, + 40573); +} + TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { const auto spec = fromjson("{$changeNotification: {}}"); diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp new file mode 100644 index 00000000000..beaa8844dcb --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_check_resume_token.h" + +using boost::intrusive_ptr; +namespace mongo { +const char* DocumentSourceCheckResumeToken::getSourceName() const { + return "$_checkResumeToken"; +} + +Value DocumentSourceCheckResumeToken::serialize( + boost::optional<ExplainOptions::Verbosity> explain) const { + // This stage is created by the DocumentSourceChangeNotification stage, so serializing it here + // would result in it being created twice. + return Value(); +} + +intrusive_ptr<DocumentSourceCheckResumeToken> DocumentSourceCheckResumeToken::create( + const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceCheckResumeTokenSpec spec) { + return new DocumentSourceCheckResumeToken(expCtx, std::move(spec)); +} + +DocumentSourceCheckResumeToken::DocumentSourceCheckResumeToken( + const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceCheckResumeTokenSpec spec) + : DocumentSource(expCtx), _token(spec.getResumeToken()), _seenDoc(false) {} + +DocumentSource::GetNextResult DocumentSourceCheckResumeToken::getNext() { + pExpCtx->checkForInterrupt(); + + auto nextInput = pSource->getNext(); + uassert(40584, + "resume of change notification was not possible, as no change data was found. ", + _seenDoc || !nextInput.isEOF()); + + if (_seenDoc || !nextInput.isAdvanced()) + return nextInput; + + _seenDoc = true; + auto doc = nextInput.getDocument(); + + ResumeToken receivedToken(doc["_id"]); + uassert( + 40585, + str::stream() + << "resume of change notification was not possible, as the resume token was not found. " + << receivedToken.toDocument().toString(), + receivedToken == _token); + // Don't return the document which has the token; the user has already seen it. + return pSource->getNext(); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h new file mode 100644 index 00000000000..13706b937b9 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_resume_token.h @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_sources_gen.h" +#include "mongo/db/pipeline/resume_token.h" + +namespace mongo { + +/** + * This stage is used internally for change notifications to ensure that the resume token is in the + * stream. It is not intended to be created by the user. + */ +class DocumentSourceCheckResumeToken final : public DocumentSource, + public SplittableDocumentSource { +public: + GetNextResult getNext() final; + const char* getSourceName() const final; + + /** + * SplittableDocumentSource methods; this has to run on the merger, since the resume point could + * be at any shard. + */ + boost::intrusive_ptr<DocumentSource> getShardSource() final { + return nullptr; + }; + boost::intrusive_ptr<DocumentSource> getMergeSource() final { + return this; + }; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final; + + static boost::intrusive_ptr<DocumentSourceCheckResumeToken> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + DocumentSourceCheckResumeTokenSpec spec); + + const ResumeToken& getTokenForTest() { + return _token; + } + +private: + /** + * Use the create static method to create a DocumentSourceCheckResumeToken. + */ + DocumentSourceCheckResumeToken(const boost::intrusive_ptr<ExpressionContext>& expCtx, + DocumentSourceCheckResumeTokenSpec spec); + + ResumeToken _token; + bool _seenDoc; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp new file mode 100644 index 00000000000..a9f057ee353 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <boost/intrusive_ptr.hpp> +#include <memory> + +#include "mongo/bson/bsonelement.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document_source_check_resume_token.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/resume_token.h" +#include "mongo/db/service_context.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/death_test.h" +#include "mongo/unittest/unittest.h" + +using boost::intrusive_ptr; + +namespace mongo { +namespace { +static constexpr StringData kTestNs = "test.ns"_sd; + +class CheckResumeTokenTest : public AggregationContextFixture { +public: + CheckResumeTokenTest() : _mock(DocumentSourceMock::create()) {} + +protected: + /** + * Puts an arbitrary document with resume token corresponding to the given timestamp, id, and + * namespace in the mock queue. + */ + void addDocument(Timestamp ts, std::string id, StringData ns = kTestNs) { + _mock->queue.push_back( + Document({{"_id", Document({{"ts", ts}, {"ns", ns}, {"_id", id}})}})); + } + + void addPause() { + _mock->queue.push_back(DocumentSource::GetNextResult::makePauseExecution()); + } + + /** + * Convenience method to create the class under test with a given timestamp, id, and namespace. + */ + intrusive_ptr<DocumentSourceCheckResumeToken> createCheckResumeToken(Timestamp ts, + StringData id, + StringData ns = kTestNs) { + auto token = ResumeToken::parse(BSON("ts" << ts << "_id" << id << "ns" << ns)); + DocumentSourceCheckResumeTokenSpec spec; + spec.setResumeToken(token); + auto checkResumeToken = DocumentSourceCheckResumeToken::create(getExpCtx(), spec); + checkResumeToken->setSource(_mock.get()); + return checkResumeToken; + } + +private: + intrusive_ptr<DocumentSourceMock> _mock; +}; + +TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + addDocument(resumeTimestamp, "1"); + // We should not see the resume token. + ASSERT_TRUE(checkResumeToken->getNext().isEOF()); +} + +TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + addPause(); + addDocument(resumeTimestamp, "1"); + + // We see the pause we inserted, but not the resume token. + ASSERT_TRUE(checkResumeToken->getNext().isPaused()); + ASSERT_TRUE(checkResumeToken->getNext().isEOF()); +} + +TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) { + Timestamp resumeTimestamp(100, 1); + Timestamp doc1Timestamp(100, 2); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + addDocument(resumeTimestamp, "1"); + addPause(); + addDocument(doc1Timestamp, "2"); + + // Pause added explicitly. + ASSERT_TRUE(checkResumeToken->getNext().isPaused()); + // The document after the resume token should be the first. + auto result1 = checkResumeToken->getNext(); + ASSERT_TRUE(result1.isAdvanced()); + auto& doc1 = result1.getDocument(); + ASSERT_VALUE_EQ(Value(doc1Timestamp), doc1["_id"].getDocument()["ts"]); + ASSERT_TRUE(checkResumeToken->getNext().isEOF()); +} + +TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + addDocument(resumeTimestamp, "0"); + + Timestamp doc1Timestamp(100, 2); + Timestamp doc2Timestamp(101, 1); + addDocument(doc1Timestamp, "1"); + addDocument(doc2Timestamp, "2"); + + auto result1 = checkResumeToken->getNext(); + ASSERT_TRUE(result1.isAdvanced()); + auto& doc1 = result1.getDocument(); + ASSERT_VALUE_EQ(Value(doc1Timestamp), doc1["_id"].getDocument()["ts"]); + auto result2 = checkResumeToken->getNext(); + ASSERT_TRUE(result2.isAdvanced()); + auto& doc2 = result2.getDocument(); + ASSERT_VALUE_EQ(Value(doc2Timestamp), doc2["_id"].getDocument()["ts"]); + ASSERT_TRUE(checkResumeToken->getNext().isEOF()); +} + +TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1"); + + Timestamp doc1Timestamp(100, 2); + Timestamp doc2Timestamp(101, 1); + addDocument(doc1Timestamp, "1"); + addDocument(doc2Timestamp, "2"); + ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585); +} + +TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongDocumentId) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + addDocument(resumeTimestamp, "1"); + ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585); +} + +TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", "test1.ns"); + addDocument(resumeTimestamp, "1", "test2.ns"); + ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585); +} + +/** + * We should _error_ on the no-document case, because that means the resume token was not found. + */ +TEST_F(CheckResumeTokenTest, ShouldFailWithNoDocuments) { + Timestamp resumeTimestamp(100, 1); + + auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0"); + ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40584); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl new file mode 100644 index 00000000000..de67f25bb02 --- /dev/null +++ b/src/mongo/db/pipeline/document_sources.idl @@ -0,0 +1,103 @@ +# Copyright (C) 2017 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the GNU Affero General Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +# Document source pipeline stage IDL file + +global: + cpp_namespace: "mongo" + cpp_includes: + - "mongo/db/pipeline/resume_token.h" + +imports: + - "mongo/idl/basic_types.idl" + +types: + # A resume token could be parsed as a struct, but since we may make it opaque in the future, we + # parse it as a type with a custom class now. + resumeToken: + bson_serialization_type: object + description: An object representing a resume token for change notification + cpp_type: ResumeToken + serializer: ResumeToken::toBSON + deserializer: ResumeToken::parse + + # The _id element in a resume token can be any BSON element, so we need a custom type which + # leaves it as a BSONElement + resumeTokenOpaqueId: + bson_serialization_type: any + description: The document id contained within a resume token + cpp_type: Value + serializer: Value::serializeForIDL + deserializer: Value::deserializeForIDL + +structs: + DocumentSourceChangeNotificationSpec: + description: A document used to specify the $changeNotification stage of an aggregation + pipeline. + fields: + resumeAfter: + cpp_name: resumeAfter + type: resumeToken + optional: true + description: An object representing the point at which we should resume reporting + changes from. + fullDocument: + cpp_name: fullDocument + type: string + default: '"none"' + description: A string '"lookup"' or '"none"', indicating whether or not we should + return a full document or just changes for an update. + + + DocumentSourceCheckResumeTokenSpec: + description: A document used to specify the internal stage which checks the presence of the + resume token. + fields: + resumeToken: + cpp_name: resumeToken + type: resumeToken + description: The resume token which is required to be present in the pipeline. + + + ResumeTokenInternal: + description: The internal format of a resume token. For use by the ResumeToken class + only. + fields: + ts: + cpp_name: timestamp + type: timestamp + description: The timestamp of the oplog entry represented by this resume token. + + ns: + cpp_name: ns + type: string + description: The namespace of the oplog entry represented by this resume token. + + _id: + cpp_name: documentId + type: resumeTokenOpaqueId + description: The document key of the document in the oplog entry represented by this + resume token. diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 9909895feb3..91d04bdf2a2 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -48,6 +48,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/collation/collator_interface_mock.h" #include "mongo/db/query/query_test_service_context.h" +#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/dbtests/dbtests.h" namespace PipelineTests { @@ -58,6 +59,14 @@ using std::vector; const NamespaceString kTestNss = NamespaceString("a.collection"); +namespace { +void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) { + repl::ReplicationCoordinator::set( + opCtx->getServiceContext(), + stdx::make_unique<repl::ReplicationCoordinatorMock>(opCtx->getServiceContext())); +} +} // namespace + namespace Optimizations { using namespace mongo; @@ -974,6 +983,7 @@ TEST(PipelineOptimizationTest, ChangeNotificationLookupSwapsWithIndependentMatch intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); expCtx->opCtx = opCtx.get(); + setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); auto spec = BSON("$changeNotification" << BSON("fullDocument" << "lookup")); @@ -998,6 +1008,7 @@ TEST(PipelineOptimizationTest, ChangeNotificationLookupDoesNotSwapWithMatchOnPos intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); expCtx->opCtx = opCtx.get(); + setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx); auto spec = BSON("$changeNotification" << BSON("fullDocument" << "lookup")); @@ -1472,6 +1483,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) { const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")}; auto ctx = getExpCtx(); + setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus()); } @@ -1480,6 +1492,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage) const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), fromjson("{$changeNotification: {}}")}; auto ctx = getExpCtx(); + setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus(); ASSERT_EQ(parseStatus, ErrorCodes::BadValue); @@ -1490,6 +1503,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageI const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"), fromjson("{$changeNotification: {}}")}; auto ctx = getExpCtx(); + setMockReplicationCoordinatorOnOpCtx(ctx->opCtx); ctx->ns = NamespaceString("a.collection"); auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus(); ASSERT_EQ(parseStatus, ErrorCodes::BadValue); diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp new file mode 100644 index 00000000000..5b409f1f113 --- /dev/null +++ b/src/mongo/db/pipeline/resume_token.cpp @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/resume_token.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_sources_gen.h" +#include "mongo/db/pipeline/value_comparator.h" + +namespace mongo { + +ResumeToken::ResumeToken(const BSONObj& resumeBson) { + auto token = ResumeTokenInternal::parse( + IDLParserErrorContext("$changeNotification.resumeAfter"), resumeBson); + _timestamp = token.getTimestamp(); + _namespace = token.getNs().toString(); + _documentId = token.getDocumentId(); +} + +ResumeToken::ResumeToken(const Value& resumeValue) { + Document resumeTokenDoc = resumeValue.getDocument(); + Value timestamp = resumeTokenDoc[ResumeTokenInternal::kTimestampFieldName]; + _timestamp = timestamp.getTimestamp(); + Value ns = resumeTokenDoc[ResumeTokenInternal::kNsFieldName]; + _namespace = ns.getString(); + _documentId = resumeTokenDoc[ResumeTokenInternal::kDocumentIdFieldName]; +} + +bool ResumeToken::operator==(const ResumeToken& other) { + return _timestamp == other._timestamp && _namespace == other._namespace && + ValueComparator::kInstance.evaluate(_documentId == other._documentId); +} + +Document ResumeToken::toDocument() const { + return Document({{ResumeTokenInternal::kTimestampFieldName, _timestamp}, + {{ResumeTokenInternal::kNsFieldName}, _namespace}, + {{ResumeTokenInternal::kDocumentIdFieldName}, _documentId}}); +} + +BSONObj ResumeToken::toBSON() const { + return BSON( + ResumeTokenInternal::kTimestampFieldName << _timestamp << ResumeTokenInternal::kNsFieldName + << _namespace + << ResumeTokenInternal::kDocumentIdFieldName + << _documentId); +} + +ResumeToken ResumeToken::parse(const BSONObj& resumeBson) { + return ResumeToken(resumeBson); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h new file mode 100644 index 00000000000..733e285e599 --- /dev/null +++ b/src/mongo/db/pipeline/resume_token.h @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/base/string_data.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/pipeline/value.h" + +namespace mongo { +/** + * A token passed in by the user to indicate where in the oplog we should start for + * $changeNotification. + */ +class ResumeToken { +public: + /** + * The default no-argument constructor is required by the IDL for types used as non-optional + * fields. + */ + ResumeToken() = default; + explicit ResumeToken(const Value& resumeValue); + bool operator==(const ResumeToken&); + + Timestamp getTimestamp() const { + return _timestamp; + } + + Document toDocument() const; + + BSONObj toBSON() const; + + /** + * Parse a resume token from a BSON object; used as an interface to the IDL parser. + */ + static ResumeToken parse(const BSONObj& obj); + +private: + /** + * Construct from a BSON object. + * External callers should use the static ResumeToken::parse(const BSONObj&) method instead. + */ + explicit ResumeToken(const BSONObj& resumeBson); + + Timestamp _timestamp; + std::string _namespace; + Value _documentId; +}; +} // namespace mongo diff --git a/src/mongo/db/pipeline/value.cpp b/src/mongo/db/pipeline/value.cpp index d70c6b65cc9..a7bce3fc777 100644 --- a/src/mongo/db/pipeline/value.cpp +++ b/src/mongo/db/pipeline/value.cpp @@ -1323,4 +1323,17 @@ Value Value::deserializeForSorter(BufReader& buf, const SorterDeserializeSetting } verify(false); } + +void Value::serializeForIDL(StringData fieldName, BSONObjBuilder* builder) const { + addToBsonObj(builder, fieldName); +} + +void Value::serializeForIDL(BSONArrayBuilder* builder) const { + addToBsonArray(builder); } + +Value Value::deserializeForIDL(const BSONElement& element) { + return Value(element); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h index 04e396dc58d..40c05d89b9a 100644 --- a/src/mongo/db/pipeline/value.h +++ b/src/mongo/db/pipeline/value.h @@ -328,6 +328,11 @@ public: return *this; } + /// Members to support parsing/deserialization from IDL generated code. + void serializeForIDL(StringData fieldName, BSONObjBuilder* builder) const; + void serializeForIDL(BSONArrayBuilder* builder) const; + static Value deserializeForIDL(const BSONElement& element); + private: /** This is a "honeypot" to prevent unexpected implicit conversions to the accepted argument * types. bool is especially bad since without this it will accept any pointer. |