1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
// Tests snapshot isolation on readConcern level snapshot reads through mongos.
// @tags: [requires_sharding]
(function() {
"use strict";
load("jstests/libs/global_snapshot_reads_util.js");
const dbName = "test";
const shardedCollName = "shardedColl";
const unshardedCollName = "unshardedColl";
const st = new ShardingTest({shards: 1, mongos: 1, config: 1});
const shardDb = st.rs0.getPrimary().getDB(dbName);
if (!shardDb.serverStatus().storageEngine.supportsSnapshotReadConcern) {
st.stop();
return;
}
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
assert.commandWorked(st.s.adminCommand(
{shardCollection: st.s.getDB(dbName)[shardedCollName] + "", key: {_id: 1}}));
function runTest(mainDb, {useCausalConsistency, collName}) {
const session = mainDb.getMongo().startSession({causalConsistency: useCausalConsistency});
const sessionDb = session.getDatabase(dbName);
const bulk = mainDb[collName].initializeUnorderedBulkOp();
for (let x = 0; x < 10; ++x) {
bulk.insert({_id: x});
}
assert.commandWorked(bulk.execute({w: "majority"}));
let txnNumber = 1;
// Test snapshot reads using aggregate.
let cursorCmd = {
aggregate: collName,
pipeline: [{$sort: {_id: 1}}],
readConcern: {level: "snapshot"},
txnNumber: NumberLong(txnNumber),
cursor: {batchSize: 5}
};
// Establish a snapshot cursor, fetching the first 5 documents.
let res = assert.commandWorked(sessionDb.runCommand(cursorCmd));
assert(res.hasOwnProperty("cursor"));
assert(res.cursor.hasOwnProperty("firstBatch"));
assert.eq(5, res.cursor.firstBatch.length);
assert(res.cursor.hasOwnProperty("id"));
const cursorId = res.cursor.id;
assert.neq(cursorId, 0);
// Insert an 11th document which should not be visible to the snapshot cursor. This write is
// performed outside of the session.
assert.writeOK(mainDb[collName].insert({_id: 10}, {writeConcern: {w: "majority"}}));
verifyInvalidGetMoreAttempts(mainDb, sessionDb, collName, cursorId, txnNumber);
// Fetch the 6th document. This confirms that the transaction stash is preserved across
// multiple getMore invocations.
res = assert.commandWorked(sessionDb.runCommand({
getMore: cursorId,
collection: collName,
batchSize: 1,
txnNumber: NumberLong(txnNumber)
}));
assert(res.hasOwnProperty("cursor"));
assert(res.cursor.hasOwnProperty("id"));
assert.neq(0, res.cursor.id);
// Exhaust the cursor, retrieving the remainder of the result set.
res = assert.commandWorked(sessionDb.runCommand({
getMore: cursorId,
collection: collName,
batchSize: 10,
txnNumber: NumberLong(txnNumber)
}));
// The cursor has been exhausted.
assert(res.hasOwnProperty("cursor"));
assert(res.cursor.hasOwnProperty("id"));
assert.eq(0, res.cursor.id);
// Only the remaining 4 of the initial 10 documents are returned. The 11th document is not
// part of the result set.
assert(res.cursor.hasOwnProperty("nextBatch"));
assert.eq(4, res.cursor.nextBatch.length);
// Perform a second snapshot read under a new transaction.
txnNumber++;
res = assert.commandWorked(sessionDb.runCommand({
aggregate: collName,
pipeline: [{$sort: {_id: 1}}],
cursor: {batchSize: 20},
readConcern: {level: "snapshot"},
txnNumber: NumberLong(txnNumber)
}));
// The cursor has been exhausted.
assert(res.hasOwnProperty("cursor"));
assert(res.cursor.hasOwnProperty("id"));
assert.eq(0, res.cursor.id);
// All 11 documents are returned.
assert(res.cursor.hasOwnProperty("firstBatch"));
assert.eq(11, res.cursor.firstBatch.length);
session.endSession();
}
jsTestLog("Running sharded");
runTest(st.s.getDB(dbName), {useCausalConsistency: false, collName: shardedCollName});
jsTestLog("Running unsharded");
runTest(st.s.getDB(dbName), {useCausalConsistency: false, collName: unshardedCollName});
st.stop();
})();
|