summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2020-04-28 12:46:31 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-28 17:00:52 +0000
commit846cd7c189350edffecfe260e149571927d19d95 (patch)
tree2b11c3663a47f70afdfe365d215a90026da97d2c
parentbbc0199f6084ac56dc5ff28f41a277d0d678540b (diff)
downloadmongo-846cd7c189350edffecfe260e149571927d19d95.tar.gz
SERVER-47576 Include atClusterTime in snapshot read reply
-rw-r--r--jstests/libs/global_snapshot_reads_util.js42
-rw-r--r--jstests/replsets/non_transaction_snapshot_reads.js6
-rw-r--r--src/mongo/db/commands/find_cmd.cpp1
-rw-r--r--src/mongo/db/commands/getmore_cmd.cpp5
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp1
-rw-r--r--src/mongo/db/query/cursor_response.cpp4
-rw-r--r--src/mongo/db/query/cursor_response.h1
7 files changed, 44 insertions, 16 deletions
diff --git a/jstests/libs/global_snapshot_reads_util.js b/jstests/libs/global_snapshot_reads_util.js
index 0286f12bacd..99ef093a06b 100644
--- a/jstests/libs/global_snapshot_reads_util.js
+++ b/jstests/libs/global_snapshot_reads_util.js
@@ -70,17 +70,6 @@ function snapshotReadsTest(testScenarioName, db, collName) {
`${collName} with read preference ${readPreferenceMode} and causal` +
` consistency ${useCausalConsistency}`);
- let causalDb;
-
- if (useCausalConsistency) {
- let session = db.getMongo().startSession({causalConsistency: true});
- causalDb = session.getDatabase(db.getName());
- // Establish timestamp.
- causalDb[collName].findOne({});
- } else {
- causalDb = db;
- }
-
for (let commandKey in commands) {
assert(commandKey);
jsTestLog("Testing the " + commandKey + " command.");
@@ -93,15 +82,33 @@ function snapshotReadsTest(testScenarioName, db, collName) {
jsTestLog(`Inserted 10 documents at timestamp ${insertTimestamp}`);
+ // Create a session if useCausalConsistency is true.
+ let causalDb, sessionTimestamp;
+
+ if (useCausalConsistency) {
+ let session = db.getMongo().startSession({causalConsistency: true});
+ causalDb = session.getDatabase(db.getName());
+ // Establish timestamp.
+ causalDb["otherCollection"].insertOne({});
+ sessionTimestamp = session.getOperationTime();
+ } else {
+ causalDb = db;
+ }
+
// Establish a snapshot cursor, fetching the first 5 documents.
res = assert.commandWorked(causalDb.runCommand(command(5, readPreferenceMode)));
-
- // TODO(SERVER-47576): Check atClusterTime is present, and >= findOne's timestamp if
- // causal.
assert.sameMembers(res.cursor.firstBatch, docs.slice(0, 5), res);
assert(res.cursor.hasOwnProperty("id"));
const cursorId = res.cursor.id;
assert.neq(cursorId, 0);
+ assert(res.cursor.hasOwnProperty("atClusterTime"));
+ let atClusterTime = res.cursor.atClusterTime;
+ assert.neq(atClusterTime, Timestamp(0, 0));
+ if (useCausalConsistency) {
+ assert.gte(atClusterTime, sessionTimestamp);
+ } else {
+ assert.gte(atClusterTime, insertTimestamp);
+ }
// This update is not visible to reads at insertTimestamp.
res = assert.commandWorked(causalDb.runCommand({
@@ -118,6 +125,7 @@ function snapshotReadsTest(testScenarioName, db, collName) {
// The cursor has been exhausted. The remaining docs don't show updated field.
assert.eq(0, res.cursor.id);
+ assert.eq(atClusterTime, res.cursor.atClusterTime);
assert.sameMembers(res.cursor.nextBatch, docs.slice(5), res);
jsTestLog(`Starting new snapshot read`);
@@ -125,6 +133,9 @@ function snapshotReadsTest(testScenarioName, db, collName) {
// This read shows the updated docs.
res = assert.commandWorked(causalDb.runCommand(command(20, readPreferenceMode)));
assert.eq(0, res.cursor.id);
+ assert(res.cursor.hasOwnProperty("atClusterTime"));
+ // Selected atClusterTime at or after first cursor's atClusterTime.
+ assert.gte(res.cursor.atClusterTime, atClusterTime);
assert.sameMembers(res.cursor.firstBatch,
[...Array(10).keys()].map((i) => ({"_id": i, "x": true})),
res);
@@ -135,6 +146,9 @@ function snapshotReadsTest(testScenarioName, db, collName) {
db.runCommand(command(20, readPreferenceMode, insertTimestamp)));
assert.sameMembers(res.cursor.firstBatch, docs, res);
+ assert.eq(0, res.cursor.id);
+ assert(res.cursor.hasOwnProperty("atClusterTime"));
+ assert.eq(res.cursor.atClusterTime, insertTimestamp);
// Reset.
assert.commandWorked(db[collName].remove({}, {writeConcern: {w: "majority"}}));
diff --git a/jstests/replsets/non_transaction_snapshot_reads.js b/jstests/replsets/non_transaction_snapshot_reads.js
index 9e30d2ce03e..0fdc3f60a51 100644
--- a/jstests/replsets/non_transaction_snapshot_reads.js
+++ b/jstests/replsets/non_transaction_snapshot_reads.js
@@ -20,5 +20,11 @@ replSet.initiateWithHighElectionTimeout();
const primary = replSet.getPrimary();
const testDB = primary.getDB('test');
snapshotReadsTest(jsTestName(), testDB, "test");
+
+// Ensure "atClusterTime" is omitted from a regular (non-snapshot) cursor.
+testDB["collection"].insertOne({});
+const cursor = assert.commandWorked(testDB.runCommand({find: "collection"})).cursor;
+assert(!cursor.hasOwnProperty("atClusterTime"));
+
replSet.stopSet();
})();
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index e93d24780ff..d83db1ac6e2 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -499,6 +499,7 @@ public:
// Stream query results, adding them to a BSONArray as we go.
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
+ options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
CursorResponseBuilder firstBatch(result, options);
Document doc;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp
index 93d81dc32db..f183225cd8c 100644
--- a/src/mongo/db/commands/getmore_cmd.cpp
+++ b/src/mongo/db/commands/getmore_cmd.cpp
@@ -577,8 +577,9 @@ public:
});
CursorId respondWithId = 0;
-
- CursorResponseBuilder nextBatch(reply, CursorResponseBuilder::Options());
+ CursorResponseBuilder::Options options;
+ options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
+ CursorResponseBuilder nextBatch(reply, options);
BSONObj obj;
PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
std::uint64_t numResults = 0;
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index aa9be39f4b8..536cca3a865 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -162,6 +162,7 @@ bool handleCursorCommand(OperationContext* opCtx,
CursorResponseBuilder::Options options;
options.isInitialResponse = true;
+ options.atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
CursorResponseBuilder responseBuilder(result, options);
auto curOp = CurOp::get(opCtx);
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index a6b271253bc..3cd99d7e806 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -44,6 +44,7 @@ const char kCursorsField[] = "cursors";
const char kCursorField[] = "cursor";
const char kIdField[] = "id";
const char kNsField[] = "ns";
+const char kAtClusterTimeField[] = "atClusterTime";
const char kBatchField[] = "nextBatch";
const char kBatchFieldInitial[] = "firstBatch";
const char kBatchDocSequenceField[] = "cursor.nextBatch";
@@ -84,6 +85,9 @@ void CursorResponseBuilder::done(CursorId cursorId, StringData cursorNamespace)
}
_cursorObject->append(kIdField, cursorId);
_cursorObject->append(kNsField, cursorNamespace);
+ if (_options.atClusterTime) {
+ _cursorObject->append(kAtClusterTimeField, _options.atClusterTime->asTimestamp());
+ }
_cursorObject.reset();
_bodyBuilder.reset();
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index 509600103e4..5e03d2c5a3e 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -54,6 +54,7 @@ public:
struct Options {
bool isInitialResponse = false;
bool useDocumentSequences = false;
+ boost::optional<LogicalTime> atClusterTime = boost::none;
};
/**