summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/aggregation/sources/changeNotification/change_notification.js216
-rw-r--r--jstests/aggregation/sources/changeNotification/lookup_post_image.js79
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp9
-rw-r--r--src/mongo/db/pipeline/SConscript19
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp76
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp34
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.cpp80
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h79
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token_test.cpp191
-rw-r--r--src/mongo/db/pipeline/document_sources.idl103
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp14
-rw-r--r--src/mongo/db/pipeline/resume_token.cpp79
-rw-r--r--src/mongo/db/pipeline/resume_token.h75
-rw-r--r--src/mongo/db/pipeline/value.cpp13
-rw-r--r--src/mongo/db/pipeline/value.h5
16 files changed, 950 insertions, 125 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification.js b/jstests/aggregation/sources/changeNotification/change_notification.js
index 98a7c0db6a0..386d679b14d 100644
--- a/jstests/aggregation/sources/changeNotification/change_notification.js
+++ b/jstests/aggregation/sources/changeNotification/change_notification.js
@@ -3,41 +3,39 @@
"use strict";
const oplogProjection = {$project: {"_id.ts": 0}};
+ function getCollectionNameFromFullNamespace(ns) {
+ return ns.split(/\.(.+)/)[1];
+ }
- /**
- * Tests the output of a $changeNotification stage, asserting only that the result at the end of
- * the change stream on the collection 'collection' (the newest matching entry in the oplog) is
- * equal to 'expectedResult'.
- *
- * Note this change assumes that the set of changes will fit within one batch.
- */
- function checkLatestChange(expectedResult, collection) {
- const cmdResponse = assert.commandWorked(db.runCommand({
- aggregate: collection.getName(),
- pipeline: [
- {$changeNotification: {}},
- // Strip the oplog fields we aren't testing.
- {$project: {"_id.ts": 0}}
- ],
- cursor: {}
- }));
- const firstBatch = cmdResponse.cursor.firstBatch;
- assert.neq(firstBatch.length, 0);
- assert.docEq(firstBatch[firstBatch.length - 1], expectedResult);
+ // Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges
+ // with the pipeline, then insert the changes, then run assertNextBatchMatches with the result
+ // of startWatchingChanges and the expected set of results.
+ function startWatchingChanges(pipeline, collection) {
+ // Strip the oplog fields we aren't testing.
+ pipeline.push(oplogProjection);
+ // Waiting for replication assures no previous operations will be included.
+ replTest.awaitReplication();
+ let res = assert.commandWorked(
+ db.runCommand({aggregate: collection.getName(), "pipeline": pipeline, cursor: {}}));
+ assert.neq(res.cursor.id, 0);
+ return res.cursor;
}
- /**
- * Tests that there are no changes in the 'collection'.
- */
- function assertNoLatestChange(collection) {
- const cmdResponse = assert.commandWorked(db.runCommand({
- aggregate: collection.getName(),
- pipeline: [
- {$changeNotification: {}},
- ],
- cursor: {}
+ function assertNextBatchMatches({cursor, expectedBatch}) {
+ replTest.awaitReplication();
+ if (expectedBatch.length == 0)
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
+ let res = assert.commandWorked(db.runCommand({
+ getMore: cursor.id,
+ collection: getCollectionNameFromFullNamespace(cursor.ns),
+ maxTimeMS: 5 * 60 * 1000,
+ batchSize: (expectedBatch.length + 1)
}));
- assert.eq(cmdResponse.cursor.firstBatch.length, 0);
+ if (expectedBatch.length == 0)
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
+ assert.docEq(res.cursor.nextBatch, expectedBatch);
}
let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1});
@@ -49,6 +47,7 @@
db.getMongo().forceReadMode('commands');
jsTestLog("Testing single insert");
+ let cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
assert.writeOK(db.t1.insert({_id: 0, a: 1}));
let expected = {
_id: {
@@ -60,9 +59,10 @@
ns: {coll: "t1", db: "test"},
operationType: "insert",
};
- checkLatestChange(expected, db.t1);
+ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing second insert");
+ cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
assert.writeOK(db.t1.insert({_id: 1, a: 2}));
expected = {
_id: {
@@ -74,9 +74,10 @@
ns: {coll: "t1", db: "test"},
operationType: "insert",
};
- checkLatestChange(expected, db.t1);
+ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing update");
+ cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
assert.writeOK(db.t1.update({_id: 0}, {a: 3}));
expected = {
_id: {_id: 0, ns: "test.t1"},
@@ -85,9 +86,10 @@
ns: {coll: "t1", db: "test"},
operationType: "replace",
};
- checkLatestChange(expected, db.t1);
+ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing update of another field");
+ cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
assert.writeOK(db.t1.update({_id: 0}, {b: 3}));
expected = {
_id: {_id: 0, ns: "test.t1"},
@@ -96,9 +98,10 @@
ns: {coll: "t1", db: "test"},
operationType: "replace",
};
- checkLatestChange(expected, db.t1);
+ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing upsert");
+ cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true}));
expected = {
_id: {
@@ -110,10 +113,11 @@
ns: {coll: "t1", db: "test"},
operationType: "insert",
};
- checkLatestChange(expected, db.t1);
+ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing partial update with $inc");
assert.writeOK(db.t1.insert({_id: 3, a: 5, b: 1}));
+ cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
assert.writeOK(db.t1.update({_id: 3}, {$inc: {b: 2}}));
expected = {
_id: {_id: 3, ns: "test.t1"},
@@ -123,9 +127,10 @@
operationType: "update",
updateDescription: {removedFields: [], updatedFields: {b: 3}},
};
- checkLatestChange(expected, db.t1);
+ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing delete");
+ cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
assert.writeOK(db.t1.remove({_id: 1}));
expected = {
_id: {_id: 1, ns: "test.t1"},
@@ -134,11 +139,13 @@
ns: {coll: "t1", db: "test"},
operationType: "delete",
};
- checkLatestChange(expected, db.t1);
+ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing intervening write on another collection");
+ cursor = startWatchingChanges([{$changeNotification: {}}], db.t1);
+ let t2cursor = startWatchingChanges([{$changeNotification: {}}], db.t2);
assert.writeOK(db.t2.insert({_id: 100, c: 1}));
- checkLatestChange(expected, db.t1);
+ assertNextBatchMatches({cursor: cursor, expectedBatch: []});
expected = {
_id: {
_id: 100,
@@ -149,21 +156,24 @@
ns: {coll: "t2", db: "test"},
operationType: "insert",
};
- checkLatestChange(expected, db.t2);
+ assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]});
jsTestLog("Testing rename");
+ t2cursor = startWatchingChanges([{$changeNotification: {}}], db.t2);
assert.writeOK(db.t2.renameCollection("t3"));
expected = {_id: {ns: "test.$cmd"}, operationType: "invalidate", fullDocument: null};
- checkLatestChange(expected, db.t2);
+ assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]});
jsTestLog("Testing insert that looks like rename");
+ const dne1cursor = startWatchingChanges([{$changeNotification: {}}], db.dne1);
+ const dne2cursor = startWatchingChanges([{$changeNotification: {}}], db.dne2);
assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"}));
- assertNoLatestChange(db.dne1);
- assertNoLatestChange(db.dne2);
+ assertNextBatchMatches({cursor: dne1cursor, expectedBatch: []});
+ assertNextBatchMatches({cursor: dne2cursor, expectedBatch: []});
// Now make sure the cursor behaves like a tailable awaitData cursor.
jsTestLog("Testing tailability");
- let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]);
+ const tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]);
assert(!tailableCursor.hasNext());
assert.writeOK(db.tailable1.insert({_id: 101, a: 1}));
assert(tailableCursor.hasNext());
@@ -192,9 +202,11 @@
// Initial batch size should be zero as there should be no data.
assert.eq(aggcursor.firstBatch.length, 0);
- // No data, so should return no results, but cursor should remain valid.
+ // No data, so should return no results, but cursor should remain valid. Note we are
+ // specifically testing awaitdata behavior here, so we cannot use the failpoint to skip the
+ // wait.
res = assert.commandWorked(
- db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50}));
+ db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 1000}));
aggcursor = res.cursor;
assert.neq(aggcursor.id, 0);
assert.eq(aggcursor.nextBatch.length, 0);
@@ -294,11 +306,117 @@
jsTestLog("Ensuring attempt to read with legacy operations fails.");
db.getMongo().forceReadMode('legacy');
- tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection],
- {cursor: {batchSize: 0}});
+ const legacyCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection],
+ {cursor: {batchSize: 0}});
assert.throws(function() {
- tailableCursor.next();
+ legacyCursor.next();
}, [], "Legacy getMore expected to fail on changeNotification cursor.");
+ /**
+ * Gets one document from the cursor using getMore with awaitData disabled. Asserts if no
+ * document is present.
+ */
+ function getOneDoc(cursor) {
+ replTest.awaitReplication();
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
+ let res = assert.commandWorked(db.runCommand({
+ getMore: cursor.id,
+ collection: getCollectionNameFromFullNamespace(cursor.ns),
+ batchSize: 1
+ }));
+ assert.eq(res.cursor.nextBatch.length, 1);
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
+ return res.cursor.nextBatch[0];
+ }
+
+ /**
+ * Attempts to get a document from the cursor with awaitData disabled, and asserts if a document
+ * is present.
+ */
+ function assertNextBatchIsEmpty(cursor) {
+ replTest.awaitReplication();
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
+ let res = assert.commandWorked(db.runCommand({
+ getMore: cursor.id,
+ collection: getCollectionNameFromFullNamespace(cursor.ns),
+ batchSize: 1
+ }));
+ assert.eq(res.cursor.nextBatch.length, 0);
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
+ }
+
+ jsTestLog("Testing resumability");
+ assert.commandWorked(db.createCollection("resume1"));
+
+ // Note we do not project away 'id.ts' as it is part of the resume token.
+ res = assert.commandWorked(
+ db.runCommand({aggregate: "resume1", pipeline: [{$changeNotification: {}}], cursor: {}}));
+ let resumeCursor = res.cursor;
+ assert.neq(resumeCursor.id, 0);
+ assert.eq(resumeCursor.firstBatch.length, 0);
+
+ // Insert a document and save the resulting change notification.
+ assert.writeOK(db.resume1.insert({_id: 1}));
+ const firstInsertChangeDoc = getOneDoc(resumeCursor);
+ assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
+
+ jsTestLog("Testing resume after one document.");
+ res = assert.commandWorked(db.runCommand({
+ aggregate: "resume1",
+ pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}],
+ cursor: {batchSize: 0}
+ }));
+ resumeCursor = res.cursor;
+ assertNextBatchIsEmpty(resumeCursor);
+
+ jsTestLog("Inserting additional documents.");
+ assert.writeOK(db.resume1.insert({_id: 2}));
+ const secondInsertChangeDoc = getOneDoc(resumeCursor);
+ assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
+ assert.writeOK(db.resume1.insert({_id: 3}));
+ const thirdInsertChangeDoc = getOneDoc(resumeCursor);
+ assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
+ assertNextBatchIsEmpty(resumeCursor);
+
+ jsTestLog("Testing resume after first document of three.");
+ res = assert.commandWorked(db.runCommand({
+ aggregate: "resume1",
+ pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}],
+ cursor: {batchSize: 0}
+ }));
+ resumeCursor = res.cursor;
+ assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc);
+ assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
+ assertNextBatchIsEmpty(resumeCursor);
+
+ jsTestLog("Testing resume after second document of three.");
+ res = assert.commandWorked(db.runCommand({
+ aggregate: "resume1",
+ pipeline: [{$changeNotification: {resumeAfter: secondInsertChangeDoc._id}}],
+ cursor: {batchSize: 0}
+ }));
+ resumeCursor = res.cursor;
+ assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
+ assertNextBatchIsEmpty(resumeCursor);
+
+ jsTestLog("Testing that resume is possible after the collection is dropped.");
+ assert(db.resume1.drop());
+ const invalidateDoc = getOneDoc(resumeCursor);
+ assert.eq(invalidateDoc.operationType, "invalidate");
+ res = assert.commandWorked(db.runCommand({
+ aggregate: "resume1",
+ pipeline: [{$changeNotification: {resumeAfter: firstInsertChangeDoc._id}}],
+ cursor: {batchSize: 0}
+ }));
+ resumeCursor = res.cursor;
+ assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc);
+ assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
+ assert.docEq(getOneDoc(resumeCursor), invalidateDoc);
+ assertNextBatchIsEmpty(resumeCursor);
+
replTest.stopSet();
}());
diff --git a/jstests/aggregation/sources/changeNotification/lookup_post_image.js b/jstests/aggregation/sources/changeNotification/lookup_post_image.js
index 99f589f6b98..cd97452e31f 100644
--- a/jstests/aggregation/sources/changeNotification/lookup_post_image.js
+++ b/jstests/aggregation/sources/changeNotification/lookup_post_image.js
@@ -23,13 +23,42 @@
return cmdResponse.cursor.firstBatch[cmdResponse.cursor.firstBatch.length - 1];
}
- jsTestLog("Testing change streams without 'fullDocument' specified");
+ function getCollectionNameFromFullNamespace(ns) {
+ return ns.split(/\.(.+)/)[1];
+ }
+
+ /**
+ * Gets one document from the cursor using getMore with awaitData disabled. Asserts if no
+ * document is present.
+ */
+ function getOneDoc(cursor) {
+ replTest.awaitReplication();
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
+ let res = assert.commandWorked(db.runCommand({
+ getMore: cursor.id,
+ collection: getCollectionNameFromFullNamespace(cursor.ns),
+ batchSize: 1
+ }));
+ assert.eq(res.cursor.nextBatch.length, 1);
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
+ return res.cursor.nextBatch[0];
+ }
+
+ // Dummy document to give a resumeAfter point.
+ db.createCollection(coll.getName());
+ let res = assert.commandWorked(db.runCommand(
+ {aggregate: coll.getName(), pipeline: [{$changeNotification: {}}], cursor: {}}));
+ assert.writeOK(coll.insert({_id: "dummy"}));
+ const firstChange = getOneDoc(res.cursor);
+ jsTestLog("Testing change streams without 'fullDocument' specified");
// Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for an
// insert.
assert.writeOK(coll.insert({_id: "fullDocument not specified"}));
- let latestChange =
- getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]});
+ let latestChange = getLastResultFromFirstBatch(
+ {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]});
assert.eq(latestChange.operationType, "insert");
assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified"});
@@ -37,16 +66,16 @@
// replacement-style update.
assert.writeOK(coll.update({_id: "fullDocument not specified"},
{_id: "fullDocument not specified", replaced: true}));
- latestChange =
- getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]});
+ latestChange = getLastResultFromFirstBatch(
+ {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]});
assert.eq(latestChange.operationType, "replace");
assert.eq(latestChange.fullDocument, {_id: "fullDocument not specified", replaced: true});
// Test that not specifying 'fullDocument' does not include a 'fullDocument' in the result for
// a non-replacement update.
assert.writeOK(coll.update({_id: "fullDocument not specified"}, {$set: {updated: true}}));
- latestChange =
- getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]});
+ latestChange = getLastResultFromFirstBatch(
+ {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]});
assert.eq(latestChange.operationType, "update");
assert.eq(null, latestChange.fullDocument);
@@ -55,8 +84,10 @@
// Test that specifying 'fullDocument' as 'none' does include a 'fullDocument' in the result for
// an insert.
assert.writeOK(coll.insert({_id: "fullDocument is none"}));
- latestChange = getLastResultFromFirstBatch(
- {collection: coll, pipeline: [{$changeNotification: {fullDocument: "none"}}]});
+ latestChange = getLastResultFromFirstBatch({
+ collection: coll,
+ pipeline: [{$changeNotification: {fullDocument: "none", resumeAfter: firstChange._id}}]
+ });
assert.eq(latestChange.operationType, "insert");
assert.eq(latestChange.fullDocument, {_id: "fullDocument is none"});
@@ -64,16 +95,16 @@
// a replacement-style update.
assert.writeOK(
coll.update({_id: "fullDocument is none"}, {_id: "fullDocument is none", replaced: true}));
- latestChange =
- getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]});
+ latestChange = getLastResultFromFirstBatch(
+ {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]});
assert.eq(latestChange.operationType, "replace");
assert.eq(latestChange.fullDocument, {_id: "fullDocument is none", replaced: true});
// Test that specifying 'fullDocument' as 'none' does not include a 'fullDocument' in the result
// for a non-replacement update.
assert.writeOK(coll.update({_id: "fullDocument is none"}, {$set: {updated: true}}));
- latestChange =
- getLastResultFromFirstBatch({collection: coll, pipeline: [{$changeNotification: {}}]});
+ latestChange = getLastResultFromFirstBatch(
+ {collection: coll, pipeline: [{$changeNotification: {resumeAfter: firstChange._id}}]});
assert.eq(latestChange.operationType, "update");
assert.eq(null, latestChange.fullDocument);
@@ -82,8 +113,10 @@
// Test that specifying 'fullDocument' as 'lookup' does include a 'fullDocument' in the result
// for an insert.
assert.writeOK(coll.insert({_id: "fullDocument is lookup"}));
- latestChange = getLastResultFromFirstBatch(
- {collection: coll, pipeline: [{$changeNotification: {fullDocument: "lookup"}}]});
+ latestChange = getLastResultFromFirstBatch({
+ collection: coll,
+ pipeline: [{$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}}]
+ });
assert.eq(latestChange.operationType, "insert");
assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup"});
@@ -91,16 +124,20 @@
// for a replacement-style update.
assert.writeOK(coll.update({_id: "fullDocument is lookup"},
{_id: "fullDocument is lookup", replaced: true}));
- latestChange = getLastResultFromFirstBatch(
- {collection: coll, pipeline: [{$changeNotification: {fullDocument: "lookup"}}]});
+ latestChange = getLastResultFromFirstBatch({
+ collection: coll,
+ pipeline: [{$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}}]
+ });
assert.eq(latestChange.operationType, "replace");
assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup", replaced: true});
// Test that specifying 'fullDocument' as 'lookup' does include a 'fullDocument' in the result
// for a non-replacement update.
assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updated: true}}));
- latestChange = getLastResultFromFirstBatch(
- {collection: coll, pipeline: [{$changeNotification: {fullDocument: "lookup"}}]});
+ latestChange = getLastResultFromFirstBatch({
+ collection: coll,
+ pipeline: [{$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}}]
+ });
assert.eq(latestChange.operationType, "update");
assert.eq(latestChange.fullDocument,
{_id: "fullDocument is lookup", replaced: true, updated: true});
@@ -111,7 +148,7 @@
latestChange = getLastResultFromFirstBatch({
collection: coll,
pipeline: [
- {$changeNotification: {fullDocument: "lookup"}},
+ {$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}},
{$match: {operationType: "update"}}
]
});
@@ -124,7 +161,7 @@
latestChange = getLastResultFromFirstBatch({
collection: coll,
pipeline: [
- {$changeNotification: {fullDocument: "lookup"}},
+ {$changeNotification: {fullDocument: "lookup", resumeAfter: firstChange._id}},
{$match: {operationType: "update"}}
]
});
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 58a1622c49b..277d8f2e660 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -66,6 +66,9 @@ namespace mongo {
namespace {
MONGO_FP_DECLARE(rsStopGetMoreCmd);
+// Failpoint for making getMore not wait for an awaitdata cursor. Allows us to avoid waiting during
+// tests.
+MONGO_FP_DECLARE(disableAwaitDataForGetMoreCmd);
} // namespace
/**
@@ -281,6 +284,8 @@ public:
const bool hasOwnMaxTime = opCtx->hasDeadline();
+ const bool disableAwaitDataFailpointActive =
+ MONGO_FAIL_POINT(disableAwaitDataForGetMoreCmd);
// We assume that cursors created through a DBDirectClient are always used from their
// original OperationContext, so we do not need to move time to and from the cursor.
if (!hasOwnMaxTime && !opCtx->getClient()->isInDirectClient()) {
@@ -288,7 +293,7 @@ public:
// awaitData, then we supply a default time of one second. Otherwise we roll over
// any leftover time from the maxTimeMS of the operation that spawned this cursor,
// applying it to this getMore.
- if (isCursorAwaitData(cursor)) {
+ if (isCursorAwaitData(cursor) && !disableAwaitDataFailpointActive) {
opCtx->setDeadlineAfterNowBy(Seconds{1});
} else if (cursor->getLeftoverMaxTimeMicros() < Microseconds::max()) {
opCtx->setDeadlineAfterNowBy(cursor->getLeftoverMaxTimeMicros());
@@ -329,7 +334,7 @@ public:
Explain::getSummaryStats(*exec, &preExecutionStats);
// Mark this as an AwaitData operation if appropriate.
- if (isCursorAwaitData(cursor)) {
+ if (isCursorAwaitData(cursor) && !disableAwaitDataFailpointActive) {
if (request.lastKnownCommittedOpTime)
clientsLastKnownCommittedOpTime(opCtx) = request.lastKnownCommittedOpTime.get();
shouldWaitForInserts(opCtx) = true;
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 093cbb5a5c9..e548ff4f464 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -130,6 +130,7 @@ env.CppUnitTest(
'document_source_bucket_auto_test.cpp',
'document_source_bucket_test.cpp',
'document_source_change_notification_test.cpp',
+ 'document_source_check_resume_token_test.cpp',
'document_source_count_test.cpp',
'document_source_current_op_test.cpp',
'document_source_geo_near_test.cpp',
@@ -157,6 +158,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/s/is_mongos',
'$BUILD_DIR/mongo/util/clock_source_mock',
@@ -264,6 +266,7 @@ docSourceEnv.Library(
'$BUILD_DIR/mongo/db/matcher/expressions',
'$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
+ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/stats/top',
'$BUILD_DIR/mongo/db/storage/encryption_hooks',
@@ -272,6 +275,7 @@ docSourceEnv.Library(
'$BUILD_DIR/third_party/shim_snappy',
'accumulator',
'dependencies',
+ 'document_sources_idl',
'document_value',
'expression',
'granularity_rounder',
@@ -312,6 +316,7 @@ env.Library(
target='document_source_lookup',
source=[
'document_source_change_notification.cpp',
+ 'document_source_check_resume_token.cpp',
'document_source_graph_lookup.cpp',
'document_source_lookup.cpp',
'document_source_lookup_change_post_image.cpp',
@@ -391,6 +396,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
'$BUILD_DIR/mongo/db/query/collation/collator_interface_mock',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
+ '$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/s/is_mongos',
@@ -491,3 +497,16 @@ env.Library(
'$BUILD_DIR/mongo/db/stats/serveronly',
],
)
+
+env.Library(
+ target='document_sources_idl',
+ source=[
+ env.Idlc('document_sources.idl')[0],
+ 'resume_token.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/idl/idl_parser',
+ 'document_value',
+ ],
+)
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index d173d1183d5..c463c3574c9 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -31,14 +31,17 @@
#include "mongo/db/pipeline/document_source_change_notification.h"
#include "mongo/bson/simple_bsonelement_comparator.h"
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -115,7 +118,9 @@ private:
};
} // namespace
-BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss) {
+BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString& nss,
+ Timestamp startFrom,
+ bool isResume) {
auto target = nss.ns();
// 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field.
@@ -140,7 +145,9 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString
auto opMatch = BSON("ns" << target);
// Match oplog entries after "start" and are either (3) supported commands or (4) CRUD ops.
- return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch));
+ // Include the resume token if resuming, so we can verify it was still present in the oplog.
+ return BSON("ts" << (isResume ? GTE : GT) << startFrom << "$or"
+ << BSON_ARRAY(opMatch << commandMatch));
}
list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson(
@@ -153,44 +160,39 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFrom
"Only default collation is allowed when using a $changeNotification stage.",
!expCtx->getCollator());
- uassert(40573,
- str::stream() << "the $changeNotification stage must be specified as an object, got "
- << typeName(elem.type()),
- elem.type() == BSONType::Object);
-
- bool shouldLookupPostImage = false;
- for (auto&& option : elem.embeddedObject()) {
- auto optionName = option.fieldNameStringData();
- if (optionName == "fullDocument"_sd) {
- uassert(40574,
- str::stream() << "the 'fullDocument' option to the $changeNotification stage "
- "must be a string, got "
- << typeName(option.type()),
- option.type() == BSONType::String);
- auto fullDocOption = option.valueStringData();
- uassert(40575,
- str::stream() << "unrecognized value for the 'fullDocument' option to the "
- "$changeNotification stage. Expected \"none\" or "
- "\"fullDocument\", got \""
- << option.String()
- << "\"",
- fullDocOption == "lookup"_sd || fullDocOption == "none"_sd);
- shouldLookupPostImage = (fullDocOption == "lookup"_sd);
- } else if (optionName == "resumeAfter"_sd) {
- uasserted(
- 40576,
- "the 'resumeAfter' option to the $changeNotification stage is not yet supported");
- } else {
- uasserted(40577,
- str::stream() << "unrecognized option to $changeNotification stage: \""
- << optionName
- << "\"");
- }
+ auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
+ uassert(40573, "The $changeNotification stage is only supported on replica sets", replCoord);
+ Timestamp startFrom = replCoord->getLastCommittedOpTime().getTimestamp();
+
+ intrusive_ptr<DocumentSourceCheckResumeToken> resumeStage = nullptr;
+ auto spec = DocumentSourceChangeNotificationSpec::parse(
+ IDLParserErrorContext("$changeNotification"), elem.embeddedObject());
+ if (auto resumeAfter = spec.getResumeAfter()) {
+ ResumeToken token = resumeAfter.get();
+ startFrom = token.getTimestamp();
+ DocumentSourceCheckResumeTokenSpec spec;
+ spec.setResumeToken(std::move(token));
+ resumeStage = DocumentSourceCheckResumeToken::create(expCtx, std::move(spec));
}
-
- auto oplogMatch = DocumentSourceOplogMatch::create(buildMatchFilter(expCtx->ns), expCtx);
+ const bool changeStreamIsResuming = resumeStage != nullptr;
+
+ auto fullDocOption = spec.getFullDocument();
+ uassert(40575,
+ str::stream() << "unrecognized value for the 'fullDocument' option to the "
+ "$changeNotification stage. Expected \"none\" or "
+ "\"lookup\", got \""
+ << fullDocOption
+ << "\"",
+ fullDocOption == "lookup"_sd || fullDocOption == "none"_sd);
+ const bool shouldLookupPostImage = (fullDocOption == "lookup"_sd);
+
+ auto oplogMatch = DocumentSourceOplogMatch::create(
+ buildMatchFilter(expCtx->ns, startFrom, changeStreamIsResuming), expCtx);
auto transformation = createTransformationStage(elem.embeddedObject(), expCtx);
list<intrusive_ptr<DocumentSource>> stages = {oplogMatch, transformation};
+ if (resumeStage) {
+ stages.push_back(resumeStage);
+ }
if (shouldLookupPostImage) {
stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
}
diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h
index ad10a5ad210..036c958888f 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.h
+++ b/src/mongo/db/pipeline/document_source_change_notification.h
@@ -30,6 +30,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
namespace mongo {
@@ -119,7 +120,7 @@ public:
* Produce the BSON object representing the filter for the $match stage to filter oplog entries
* to only those relevant for this $changeNotification stage.
*/
- static BSONObj buildMatchFilter(const NamespaceString& nss);
+ static BSONObj buildMatchFilter(const NamespaceString& nss, Timestamp startFrom, bool isResume);
/**
* Parses a $changeNotification stage from 'elem' and produces the $match and transformation
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index 9c08bd91637..ffc9460c96d 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/pipeline/value.h"
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -65,9 +66,15 @@ static const Timestamp ts(100, 1);
static const repl::OpTime optime(ts, 1);
static const NamespaceString nss("unittests.change_notification");
+using ChangeNotificationStageTestNoSetup = AggregationContextFixture;
+
class ChangeNotificationStageTest : public AggregationContextFixture {
public:
- ChangeNotificationStageTest() : AggregationContextFixture(nss) {}
+ ChangeNotificationStageTest() : AggregationContextFixture(nss) {
+ repl::ReplicationCoordinator::set(getExpCtx()->opCtx->getServiceContext(),
+ stdx::make_unique<repl::ReplicationCoordinatorMock>(
+ getExpCtx()->opCtx->getServiceContext()));
+ }
void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) {
const auto spec = fromjson("{$changeNotification: {}}");
@@ -106,19 +113,7 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedOption) {
BSON(DSChangeNotification::kStageName << BSON("unexpected" << 4)).firstElement(),
expCtx),
UserException,
- 40577);
-}
-
-TEST_F(ChangeNotificationStageTest, ShouldRejectResumeAfterOption) {
- // TODO SERVER-29131 change this test to accept the option.
- auto expCtx = getExpCtx();
-
- ASSERT_THROWS_CODE(
- DSChangeNotification::createFromBson(
- BSON(DSChangeNotification::kStageName << BSON("resumeAfter" << ts)).firstElement(),
- expCtx),
- UserException,
- 40576);
+ 40415);
}
TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) {
@@ -129,7 +124,7 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) {
BSON(DSChangeNotification::kStageName << BSON("fullDocument" << true)).firstElement(),
expCtx),
UserException,
- 40574);
+ ErrorCodes::TypeMismatch);
}
TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption) {
@@ -144,6 +139,15 @@ TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption)
40575);
}
+TEST_F(ChangeNotificationStageTestNoSetup, FailsWithNoReplicationCoordinator) {
+ const auto spec = fromjson("{$changeNotification: {}}");
+
+ ASSERT_THROWS_CODE(
+ DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()),
+ UserException,
+ 40573);
+}
+
TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
const auto spec = fromjson("{$changeNotification: {}}");
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.cpp b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
new file mode 100644
index 00000000000..beaa8844dcb
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.cpp
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
+
+using boost::intrusive_ptr;
+namespace mongo {
+const char* DocumentSourceCheckResumeToken::getSourceName() const {
+ return "$_checkResumeToken";
+}
+
+Value DocumentSourceCheckResumeToken::serialize(
+ boost::optional<ExplainOptions::Verbosity> explain) const {
+ // This stage is created by the DocumentSourceChangeNotification stage, so serializing it here
+ // would result in it being created twice.
+ return Value();
+}
+
+intrusive_ptr<DocumentSourceCheckResumeToken> DocumentSourceCheckResumeToken::create(
+ const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceCheckResumeTokenSpec spec) {
+ return new DocumentSourceCheckResumeToken(expCtx, std::move(spec));
+}
+
+DocumentSourceCheckResumeToken::DocumentSourceCheckResumeToken(
+ const intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceCheckResumeTokenSpec spec)
+ : DocumentSource(expCtx), _token(spec.getResumeToken()), _seenDoc(false) {}
+
+DocumentSource::GetNextResult DocumentSourceCheckResumeToken::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ auto nextInput = pSource->getNext();
+ uassert(40584,
+ "resume of change notification was not possible, as no change data was found. ",
+ _seenDoc || !nextInput.isEOF());
+
+ if (_seenDoc || !nextInput.isAdvanced())
+ return nextInput;
+
+ _seenDoc = true;
+ auto doc = nextInput.getDocument();
+
+ ResumeToken receivedToken(doc["_id"]);
+ uassert(
+ 40585,
+ str::stream()
+ << "resume of change notification was not possible, as the resume token was not found. "
+ << receivedToken.toDocument().toString(),
+ receivedToken == _token);
+ // Don't return the document which has the token; the user has already seen it.
+ return pSource->getNext();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
new file mode 100644
index 00000000000..13706b937b9
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
+#include "mongo/db/pipeline/resume_token.h"
+
+namespace mongo {
+
+/**
+ * This stage is used internally for change notifications to ensure that the resume token is in the
+ * stream. It is not intended to be created by the user.
+ */
+class DocumentSourceCheckResumeToken final : public DocumentSource,
+ public SplittableDocumentSource {
+public:
+ GetNextResult getNext() final;
+ const char* getSourceName() const final;
+
+ /**
+ * SplittableDocumentSource methods; this has to run on the merger, since the resume point could
+ * be at any shard.
+ */
+ boost::intrusive_ptr<DocumentSource> getShardSource() final {
+ return nullptr;
+ };
+ boost::intrusive_ptr<DocumentSource> getMergeSource() final {
+ return this;
+ };
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final;
+
+ static boost::intrusive_ptr<DocumentSourceCheckResumeToken> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ DocumentSourceCheckResumeTokenSpec spec);
+
+ const ResumeToken& getTokenForTest() {
+ return _token;
+ }
+
+private:
+ /**
+ * Use the create static method to create a DocumentSourceCheckResumeToken.
+ */
+ DocumentSourceCheckResumeToken(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ DocumentSourceCheckResumeTokenSpec spec);
+
+ ResumeToken _token;
+ bool _seenDoc;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
new file mode 100644
index 00000000000..a9f057ee353
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_check_resume_token_test.cpp
@@ -0,0 +1,191 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include <memory>
+
+#include "mongo/bson/bsonelement.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_check_resume_token.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/resume_token.h"
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+
+using boost::intrusive_ptr;
+
+namespace mongo {
+namespace {
+static constexpr StringData kTestNs = "test.ns"_sd;
+
+class CheckResumeTokenTest : public AggregationContextFixture {
+public:
+ CheckResumeTokenTest() : _mock(DocumentSourceMock::create()) {}
+
+protected:
+ /**
+ * Puts an arbitrary document with resume token corresponding to the given timestamp, id, and
+ * namespace in the mock queue.
+ */
+ void addDocument(Timestamp ts, std::string id, StringData ns = kTestNs) {
+ _mock->queue.push_back(
+ Document({{"_id", Document({{"ts", ts}, {"ns", ns}, {"_id", id}})}}));
+ }
+
+ void addPause() {
+ _mock->queue.push_back(DocumentSource::GetNextResult::makePauseExecution());
+ }
+
+ /**
+ * Convenience method to create the class under test with a given timestamp, id, and namespace.
+ */
+ intrusive_ptr<DocumentSourceCheckResumeToken> createCheckResumeToken(Timestamp ts,
+ StringData id,
+ StringData ns = kTestNs) {
+ auto token = ResumeToken::parse(BSON("ts" << ts << "_id" << id << "ns" << ns));
+ DocumentSourceCheckResumeTokenSpec spec;
+ spec.setResumeToken(token);
+ auto checkResumeToken = DocumentSourceCheckResumeToken::create(getExpCtx(), spec);
+ checkResumeToken->setSource(_mock.get());
+ return checkResumeToken;
+ }
+
+private:
+ intrusive_ptr<DocumentSourceMock> _mock;
+};
+
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithOnlyResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ addDocument(resumeTimestamp, "1");
+ // We should not see the resume token.
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesBeforeResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ addPause();
+ addDocument(resumeTimestamp, "1");
+
+ // We see the pause we inserted, but not the resume token.
+ ASSERT_TRUE(checkResumeToken->getNext().isPaused());
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithPausesAfterResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+ Timestamp doc1Timestamp(100, 2);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+ addDocument(resumeTimestamp, "1");
+ addPause();
+ addDocument(doc1Timestamp, "2");
+
+ // Pause added explicitly.
+ ASSERT_TRUE(checkResumeToken->getNext().isPaused());
+ // The document after the resume token should be the first.
+ auto result1 = checkResumeToken->getNext();
+ ASSERT_TRUE(result1.isAdvanced());
+ auto& doc1 = result1.getDocument();
+ ASSERT_VALUE_EQ(Value(doc1Timestamp), doc1["_id"].getDocument()["ts"]);
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
+TEST_F(CheckResumeTokenTest, ShouldSucceedWithMultipleDocumentsAfterResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ addDocument(resumeTimestamp, "0");
+
+ Timestamp doc1Timestamp(100, 2);
+ Timestamp doc2Timestamp(101, 1);
+ addDocument(doc1Timestamp, "1");
+ addDocument(doc2Timestamp, "2");
+
+ auto result1 = checkResumeToken->getNext();
+ ASSERT_TRUE(result1.isAdvanced());
+ auto& doc1 = result1.getDocument();
+ ASSERT_VALUE_EQ(Value(doc1Timestamp), doc1["_id"].getDocument()["ts"]);
+ auto result2 = checkResumeToken->getNext();
+ ASSERT_TRUE(result2.isAdvanced());
+ auto& doc2 = result2.getDocument();
+ ASSERT_VALUE_EQ(Value(doc2Timestamp), doc2["_id"].getDocument()["ts"]);
+ ASSERT_TRUE(checkResumeToken->getNext().isEOF());
+}
+
+TEST_F(CheckResumeTokenTest, ShouldFailIfFirstDocHasWrongResumeToken) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1");
+
+ Timestamp doc1Timestamp(100, 2);
+ Timestamp doc2Timestamp(101, 1);
+ addDocument(doc1Timestamp, "1");
+ addDocument(doc2Timestamp, "2");
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585);
+}
+
+TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongDocumentId) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ addDocument(resumeTimestamp, "1");
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585);
+}
+
+TEST_F(CheckResumeTokenTest, ShouldFailIfTokenHasWrongNamespace) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "1", "test1.ns");
+ addDocument(resumeTimestamp, "1", "test2.ns");
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40585);
+}
+
+/**
+ * We should _error_ on the no-document case, because that means the resume token was not found.
+ */
+TEST_F(CheckResumeTokenTest, ShouldFailWithNoDocuments) {
+ Timestamp resumeTimestamp(100, 1);
+
+ auto checkResumeToken = createCheckResumeToken(resumeTimestamp, "0");
+ ASSERT_THROWS_CODE(checkResumeToken->getNext(), UserException, 40584);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl
new file mode 100644
index 00000000000..de67f25bb02
--- /dev/null
+++ b/src/mongo/db/pipeline/document_sources.idl
@@ -0,0 +1,103 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the GNU Affero General Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+
+# Document source pipeline stage IDL file
+
+global:
+ cpp_namespace: "mongo"
+ cpp_includes:
+ - "mongo/db/pipeline/resume_token.h"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+types:
+ # A resume token could be parsed as a struct, but since we may make it opaque in the future, we
+ # parse it as a type with a custom class now.
+ resumeToken:
+ bson_serialization_type: object
+ description: An object representing a resume token for change notification
+ cpp_type: ResumeToken
+ serializer: ResumeToken::toBSON
+ deserializer: ResumeToken::parse
+
+ # The _id element in a resume token can be any BSON element, so we need a custom type which
+ # leaves it as a BSONElement
+ resumeTokenOpaqueId:
+ bson_serialization_type: any
+ description: The document id contained within a resume token
+ cpp_type: Value
+ serializer: Value::serializeForIDL
+ deserializer: Value::deserializeForIDL
+
+structs:
+ DocumentSourceChangeNotificationSpec:
+ description: A document used to specify the $changeNotification stage of an aggregation
+ pipeline.
+ fields:
+ resumeAfter:
+ cpp_name: resumeAfter
+ type: resumeToken
+ optional: true
+ description: An object representing the point at which we should resume reporting
+ changes from.
+ fullDocument:
+ cpp_name: fullDocument
+ type: string
+ default: '"none"'
+ description: A string '"lookup"' or '"none"', indicating whether or not we should
+ return a full document or just changes for an update.
+
+
+ DocumentSourceCheckResumeTokenSpec:
+ description: A document used to specify the internal stage which checks the presence of the
+ resume token.
+ fields:
+ resumeToken:
+ cpp_name: resumeToken
+ type: resumeToken
+ description: The resume token which is required to be present in the pipeline.
+
+
+ ResumeTokenInternal:
+ description: The internal format of a resume token. For use by the ResumeToken class
+ only.
+ fields:
+ ts:
+ cpp_name: timestamp
+ type: timestamp
+ description: The timestamp of the oplog entry represented by this resume token.
+
+ ns:
+ cpp_name: ns
+ type: string
+ description: The namespace of the oplog entry represented by this resume token.
+
+ _id:
+ cpp_name: documentId
+ type: resumeTokenOpaqueId
+ description: The document key of the document in the oplog entry represented by this
+ resume token.
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 9909895feb3..91d04bdf2a2 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_interface_mock.h"
#include "mongo/db/query/query_test_service_context.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/dbtests/dbtests.h"
namespace PipelineTests {
@@ -58,6 +59,14 @@ using std::vector;
const NamespaceString kTestNss = NamespaceString("a.collection");
+namespace {
+void setMockReplicationCoordinatorOnOpCtx(OperationContext* opCtx) {
+ repl::ReplicationCoordinator::set(
+ opCtx->getServiceContext(),
+ stdx::make_unique<repl::ReplicationCoordinatorMock>(opCtx->getServiceContext()));
+}
+} // namespace
+
namespace Optimizations {
using namespace mongo;
@@ -974,6 +983,7 @@ TEST(PipelineOptimizationTest, ChangeNotificationLookupSwapsWithIndependentMatch
intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
expCtx->opCtx = opCtx.get();
+ setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
auto spec = BSON("$changeNotification" << BSON("fullDocument"
<< "lookup"));
@@ -998,6 +1008,7 @@ TEST(PipelineOptimizationTest, ChangeNotificationLookupDoesNotSwapWithMatchOnPos
intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
expCtx->opCtx = opCtx.get();
+ setMockReplicationCoordinatorOnOpCtx(expCtx->opCtx);
auto spec = BSON("$changeNotification" << BSON("fullDocument"
<< "lookup"));
@@ -1472,6 +1483,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles
TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsValidAsFirstStage) {
const std::vector<BSONObj> rawPipeline = {fromjson("{$changeNotification: {}}")};
auto ctx = getExpCtx();
+ setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
}
@@ -1480,6 +1492,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStage)
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
fromjson("{$changeNotification: {}}")};
auto ctx = getExpCtx();
+ setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
auto parseStatus = Pipeline::parse(rawPipeline, ctx).getStatus();
ASSERT_EQ(parseStatus, ErrorCodes::BadValue);
@@ -1490,6 +1503,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeNotificationIsNotValidIfNotFirstStageI
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
fromjson("{$changeNotification: {}}")};
auto ctx = getExpCtx();
+ setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
ctx->ns = NamespaceString("a.collection");
auto parseStatus = Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus();
ASSERT_EQ(parseStatus, ErrorCodes::BadValue);
diff --git a/src/mongo/db/pipeline/resume_token.cpp b/src/mongo/db/pipeline/resume_token.cpp
new file mode 100644
index 00000000000..5b409f1f113
--- /dev/null
+++ b/src/mongo/db/pipeline/resume_token.cpp
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/resume_token.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
+#include "mongo/db/pipeline/value_comparator.h"
+
+namespace mongo {
+
+ResumeToken::ResumeToken(const BSONObj& resumeBson) {
+ auto token = ResumeTokenInternal::parse(
+ IDLParserErrorContext("$changeNotification.resumeAfter"), resumeBson);
+ _timestamp = token.getTimestamp();
+ _namespace = token.getNs().toString();
+ _documentId = token.getDocumentId();
+}
+
+ResumeToken::ResumeToken(const Value& resumeValue) {
+ Document resumeTokenDoc = resumeValue.getDocument();
+ Value timestamp = resumeTokenDoc[ResumeTokenInternal::kTimestampFieldName];
+ _timestamp = timestamp.getTimestamp();
+ Value ns = resumeTokenDoc[ResumeTokenInternal::kNsFieldName];
+ _namespace = ns.getString();
+ _documentId = resumeTokenDoc[ResumeTokenInternal::kDocumentIdFieldName];
+}
+
+bool ResumeToken::operator==(const ResumeToken& other) {
+ return _timestamp == other._timestamp && _namespace == other._namespace &&
+ ValueComparator::kInstance.evaluate(_documentId == other._documentId);
+}
+
+Document ResumeToken::toDocument() const {
+ return Document({{ResumeTokenInternal::kTimestampFieldName, _timestamp},
+ {{ResumeTokenInternal::kNsFieldName}, _namespace},
+ {{ResumeTokenInternal::kDocumentIdFieldName}, _documentId}});
+}
+
+BSONObj ResumeToken::toBSON() const {
+ return BSON(
+ ResumeTokenInternal::kTimestampFieldName << _timestamp << ResumeTokenInternal::kNsFieldName
+ << _namespace
+ << ResumeTokenInternal::kDocumentIdFieldName
+ << _documentId);
+}
+
+ResumeToken ResumeToken::parse(const BSONObj& resumeBson) {
+ return ResumeToken(resumeBson);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/resume_token.h b/src/mongo/db/pipeline/resume_token.h
new file mode 100644
index 00000000000..733e285e599
--- /dev/null
+++ b/src/mongo/db/pipeline/resume_token.h
@@ -0,0 +1,75 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/string_data.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/pipeline/value.h"
+
+namespace mongo {
+/**
+ * A token passed in by the user to indicate where in the oplog we should start for
+ * $changeNotification.
+ */
+class ResumeToken {
+public:
+ /**
+ * The default no-argument constructor is required by the IDL for types used as non-optional
+ * fields.
+ */
+ ResumeToken() = default;
+ explicit ResumeToken(const Value& resumeValue);
+ bool operator==(const ResumeToken&);
+
+ Timestamp getTimestamp() const {
+ return _timestamp;
+ }
+
+ Document toDocument() const;
+
+ BSONObj toBSON() const;
+
+ /**
+ * Parse a resume token from a BSON object; used as an interface to the IDL parser.
+ */
+ static ResumeToken parse(const BSONObj& obj);
+
+private:
+ /**
+ * Construct from a BSON object.
+ * External callers should use the static ResumeToken::parse(const BSONObj&) method instead.
+ */
+ explicit ResumeToken(const BSONObj& resumeBson);
+
+ Timestamp _timestamp;
+ std::string _namespace;
+ Value _documentId;
+};
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/value.cpp b/src/mongo/db/pipeline/value.cpp
index d70c6b65cc9..a7bce3fc777 100644
--- a/src/mongo/db/pipeline/value.cpp
+++ b/src/mongo/db/pipeline/value.cpp
@@ -1323,4 +1323,17 @@ Value Value::deserializeForSorter(BufReader& buf, const SorterDeserializeSetting
}
verify(false);
}
+
+void Value::serializeForIDL(StringData fieldName, BSONObjBuilder* builder) const {
+ addToBsonObj(builder, fieldName);
+}
+
+void Value::serializeForIDL(BSONArrayBuilder* builder) const {
+ addToBsonArray(builder);
}
+
+Value Value::deserializeForIDL(const BSONElement& element) {
+ return Value(element);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h
index 04e396dc58d..40c05d89b9a 100644
--- a/src/mongo/db/pipeline/value.h
+++ b/src/mongo/db/pipeline/value.h
@@ -328,6 +328,11 @@ public:
return *this;
}
+ /// Members to support parsing/deserialization from IDL generated code.
+ void serializeForIDL(StringData fieldName, BSONObjBuilder* builder) const;
+ void serializeForIDL(BSONArrayBuilder* builder) const;
+ static Value deserializeForIDL(const BSONElement& element);
+
private:
/** This is a "honeypot" to prevent unexpected implicit conversions to the accepted argument
* types. bool is especially bad since without this it will accept any pointer.