1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
/**
* Test readCommitted lookup/graphLookup. 'db' must be the test database for either the replica set
* primary or mongos instance. 'secondary' is the shard/replica set secondary. If 'db' is backed
* by a mongos instance then the associated cluster should have only a single shard. 'rst' is the
* ReplSetTest instance associated with the replica set/shard.
*/
load("jstests/libs/write_concern_util.js");
function testReadCommittedLookup(db, secondary, rst) {
/**
* stopServerReplication uses the 'stopReplProducer' fail point to stop server replication on
* the given secondary.
*/
function pauseReplication(sec) {
stopServerReplication(sec);
}
/**
* Turns off the 'stopReplProducer' fail point to resume server replication on the
* given secondary.
*/
function resumeReplication(sec) {
restartServerReplication(sec);
}
const aggCmdLookupObj = {
aggregate: "local",
pipeline: [
{
$lookup: {
from: "foreign",
localField: "foreignKey",
foreignField: "matchedField",
as: "match",
}
},
],
cursor: {},
readConcern: {
level: "majority",
}
};
const aggCmdGraphLookupObj = {
aggregate: "local",
pipeline: [{
$graphLookup: {
from: "foreign",
startWith: '$foreignKey',
connectFromField: 'foreignKey',
connectToField: "matchedField",
as: "match"
}
}],
cursor: {},
readConcern: {
level: "majority",
}
};
// Seed matching data.
const majorityWriteConcernObj = {writeConcern: {w: "majority", wtimeout: 60 * 1000}};
db.local.deleteMany({}, majorityWriteConcernObj);
const localId = db.local.insertOne({foreignKey: "x"}, majorityWriteConcernObj).insertedId;
db.foreign.deleteMany({}, majorityWriteConcernObj);
const foreignId = db.foreign.insertOne({matchedField: "x"}, majorityWriteConcernObj).insertedId;
const expectedMatchedResult = [{
_id: localId,
foreignKey: "x",
match: [
{_id: foreignId, matchedField: "x"},
],
}];
const expectedUnmatchedResult = [{
_id: localId,
foreignKey: "x",
match: [],
}];
// Confirm lookup/graphLookup return the matched result.
let result = db.runCommand(aggCmdLookupObj).cursor.firstBatch;
assert.eq(result, expectedMatchedResult);
result = db.runCommand(aggCmdGraphLookupObj).cursor.firstBatch;
assert.eq(result, expectedMatchedResult);
// Stop oplog application on the secondary so that it won't acknowledge updates.
pauseReplication(secondary);
// Update foreign data to no longer match, without a majority write concern.
db.foreign.updateOne({_id: foreignId}, {$set: {matchedField: "non-match"}});
// lookup/graphLookup should not see the update, since it has not been acknowledged by the
// secondary.
result = db.runCommand(aggCmdLookupObj).cursor.firstBatch;
assert.eq(result, expectedMatchedResult);
result = db.runCommand(aggCmdGraphLookupObj).cursor.firstBatch;
assert.eq(result, expectedMatchedResult);
// Restart oplog application on the secondary and wait for it's snapshot to catch up.
resumeReplication(secondary);
rst.awaitLastOpCommitted();
// Now lookup/graphLookup should report that the documents don't match.
result = db.runCommand(aggCmdLookupObj).cursor.firstBatch;
assert.eq(result, expectedUnmatchedResult);
result = db.runCommand(aggCmdGraphLookupObj).cursor.firstBatch;
assert.eq(result, expectedUnmatchedResult);
}
|