summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/oplog_rollover_agg.js
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2021-01-21 23:06:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-15 16:18:24 +0000
commit394572f340bd06f5306a2e961cd9d4e0d3e96bb1 (patch)
tree85d6b30dcb8e142676192da192f7df70d27f58b7 /jstests/noPassthrough/oplog_rollover_agg.js
parent16aff18cbb6b993ac325a50a59b1898d35cf08c3 (diff)
downloadmongo-394572f340bd06f5306a2e961cd9d4e0d3e96bb1.tar.gz
SERVER-50580 SBE should obey ASSERT_MIN_TS_HAS_NOT_FALLEN_OFF_OPLOG flag
Diffstat (limited to 'jstests/noPassthrough/oplog_rollover_agg.js')
-rw-r--r--jstests/noPassthrough/oplog_rollover_agg.js134
1 files changed, 134 insertions, 0 deletions
diff --git a/jstests/noPassthrough/oplog_rollover_agg.js b/jstests/noPassthrough/oplog_rollover_agg.js
new file mode 100644
index 00000000000..c41ebb2fedd
--- /dev/null
+++ b/jstests/noPassthrough/oplog_rollover_agg.js
@@ -0,0 +1,134 @@
+// Tests the behaviour of an agg with $_requestReshardingResumeToken on an oplog which rolls over.
+// @tags: [
+// requires_find_command,
+// requires_journaling,
+// requires_majority_read_concern,
+// ]
+(function() {
+"use strict";
+
+load('jstests/replsets/rslib.js'); // For getLatestOp.
+
+const oplogSize = 1; // size in MB
+const rst = new ReplSetTest({nodes: 1, oplogSize: oplogSize});
+
+rst.startSet();
+rst.initiate();
+
+const testDB = rst.getPrimary().getDB(jsTestName());
+const testColl = testDB[jsTestName()];
+
+const localDB = testDB.getSiblingDB("local");
+const oplogColl = localDB.oplog.rs;
+
+// Insert one document into the test collection.
+const insertCmdRes = assert.commandWorked(testDB.runCommand(
+ {insert: testColl.getName(), documents: [{_id: 1}], writeConcern: {w: "majority"}}));
+
+// Record the optime of the insert to resume from later in the test.
+const resumeTimeFirstInsert = insertCmdRes.operationTime;
+
+// Update the document to create another oplog entry.
+assert.commandWorked(testColl.update({_id: 1}, {$set: {updated: true}}));
+
+// Verify that an aggregation which requests a resharding resume token but does not include a filter
+// on 'ts' is rejected.
+assert.commandFailedWithCode(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}),
+ ErrorCodes.InvalidOptions);
+
+// Verify that we can start an aggregation from the timestamp that we took earlier, and that we see
+// the subsequent update operation.
+let aggCmdRes = assert.commandWorked(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gt: resumeTimeFirstInsert}, ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}));
+
+const aggCmdCursor = new DBCommandCursor(localDB, aggCmdRes);
+assert.soon(() => aggCmdCursor.hasNext());
+let next = aggCmdCursor.next();
+assert.eq(next.op, "u");
+assert.eq(next.o2._id, 1);
+
+// Confirm that we can begin an aggregation at a timestamp that precedes the start of the oplog, if
+// the first entry in the oplog is the replica set initialization message.
+const firstOplogEntry =
+ testDB.getSiblingDB("local").oplog.rs.find().sort({$natural: 1}).limit(1).toArray()[0];
+assert.eq(firstOplogEntry.o.msg, "initiating set");
+assert.eq(firstOplogEntry.op, "n");
+
+aggCmdRes = assert.commandWorked(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}));
+const startAtDawnOfTimeCursor = new DBCommandCursor(localDB, aggCmdRes);
+
+for (let expectedOp of [{op: "i", _id: 1}, {op: "u", _id: 1}]) {
+ assert.soon(() => startAtDawnOfTimeCursor.hasNext());
+ next = startAtDawnOfTimeCursor.next();
+ assert.eq(next.op, expectedOp.op);
+ assert.eq((next.o._id || next.o2._id), expectedOp._id);
+}
+
+// Roll over the entire oplog such that none of the events are still present.
+const primaryNode = rst.getPrimary();
+const mostRecentOplogEntry = getLatestOp(primaryNode);
+assert.neq(mostRecentOplogEntry, null);
+const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi');
+
+function oplogIsRolledOver() {
+ // The oplog has rolled over if the op that used to be newest is now older than the
+ // oplog's current oldest entry. Said another way, the oplog is rolled over when
+ // everything in the oplog is newer than what used to be the newest entry.
+ return bsonWoCompare(mostRecentOplogEntry.ts,
+ getLeastRecentOp({server: primaryNode, readConcern: "majority"}).ts) < 0;
+}
+
+while (!oplogIsRolledOver()) {
+ assert.commandWorked(testColl.insert({long_str: largeStr}, {writeConcern: {w: "majority"}}));
+}
+
+// Test that attempting to start from a timestamp that has already fallen off the oplog fails, if we
+// specify $_requestReshardingResumeToken.
+assert.commandFailedWithCode(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gte: resumeTimeFirstInsert}, ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}),
+ ErrorCodes.OplogQueryMinTsMissing);
+
+assert.commandFailedWithCode(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}],
+ cursor: {},
+ $_requestReshardingResumeToken: true
+}),
+ ErrorCodes.OplogQueryMinTsMissing);
+
+// However, the same aggregation succeeds if we do not specify $_requestReshardingResumeToken.
+assert.commandWorked(localDB.runCommand({
+ aggregate: oplogColl.getName(),
+ pipeline: [{$match: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()}}],
+ cursor: {},
+}));
+
+// Requesting resume tokens on a find command does not imply 'assertMinTsHasNotFallenOffOplog'.
+assert.commandWorked(localDB.runCommand({
+ find: oplogColl.getName(),
+ filter: {ts: {$gte: Timestamp(1, 1)}, ns: testColl.getFullName()},
+ tailable: true,
+ hint: {$natural: 1},
+ $_requestResumeToken: true
+}));
+
+rst.stopSet();
+})();