1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
/**
* Test which verifies that $out/$merge aggregations with secondary read preference which write
* over 16 MB work as expected (especially with respect to producing correctly sized write batches).
*
* @tags: [uses_$out, assumes_read_preference_unchanged]
*/
(function() {
const dbName = "db";
const collName = "movies";
const targetCollName = "movies2";
function testFn(db) {
const coll = db[collName];
coll.drop();
db[targetCollName].drop();
// Insert 4 MB more than the maximum bytes allowed in a single write batch worth of data
// serialized as a single BSONObj.
const hello = db.hello();
const maxBatchSize = hello.maxWriteBatchSize;
const totalDataSize = hello.maxBsonObjectSize + (4 * 1024 * 1024);
const sizePerDoc = totalDataSize / maxBatchSize;
const bigString = "a".repeat(sizePerDoc);
const bulk = coll.initializeUnorderedBulkOp();
for (let i = 0; i < maxBatchSize; ++i) {
bulk.insert({_id: NumberInt(i), foo: bigString});
}
assert.commandWorked(bulk.execute({w: "majority"}));
function defaultSetUpFn(db) {
db[targetCollName].drop({writeConcern: {w: "majority"}});
}
function cleanUpFn(db) {
db[targetCollName].drop({writeConcern: {w: "majority"}});
}
function testWriteAggSpec(aggWriteStageSpec, setUpFn, errorCodes = []) {
// Run 'aggWriteStageSpec' with both primary and secondary read preference.
for (const readPref of ["primary", "secondary"]) {
jsTestLog("Testing " + tojson(aggWriteStageSpec) + " with read preference " + readPref);
setUpFn(db);
// If the caller provided some error codes, assert that the command failed with one
// of these codes.
const fn = () =>
db[collName]
.aggregate([aggWriteStageSpec], {$readPreference: {mode: readPref}})
.itcount();
const errMsg = "Failed to run aggregate with read preference " + readPref;
if (errorCodes.length > 0) {
assert.throwsWithCode(fn, errorCodes, [] /* params */, errMsg);
} else {
assert.doesNotThrow(fn, [] /* params */, errMsg);
}
cleanUpFn(db);
}
}
// Set up documents in the output collection so that $merge will perform updates.
function mergeUpdateSetupFn(db) {
defaultSetUpFn(db);
const bulk = db[targetCollName].initializeUnorderedBulkOp();
for (let i = 0; i < maxBatchSize; ++i) {
bulk.insert({_id: NumberInt(i), extraField: i * 3});
}
assert.commandWorked(bulk.execute({w: "majority"}));
}
testWriteAggSpec({$out: targetCollName}, defaultSetUpFn);
testWriteAggSpec(
{$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "insert"}},
defaultSetUpFn);
testWriteAggSpec(
{$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert"}},
defaultSetUpFn);
testWriteAggSpec(
{$merge: {into: targetCollName, whenMatched: "keepExisting", whenNotMatched: "insert"}},
defaultSetUpFn);
testWriteAggSpec(
{$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "insert", on: "_id"}},
mergeUpdateSetupFn);
// Failure cases.
testWriteAggSpec(
{$merge: {into: targetCollName, whenMatched: "replace", whenNotMatched: "fail", on: "_id"}},
defaultSetUpFn,
[ErrorCodes.MergeStageNoMatchingDocument]);
testWriteAggSpec(
{$merge: {into: targetCollName, whenMatched: "merge", whenNotMatched: "fail", on: "_id"}},
defaultSetUpFn,
[ErrorCodes.MergeStageNoMatchingDocument]);
testWriteAggSpec(
{$merge: {into: targetCollName, whenMatched: "fail", whenNotMatched: "insert", on: "_id"}},
mergeUpdateSetupFn,
[ErrorCodes.DuplicateKey]);
}
jsTestLog("Testing against a replica set");
const rst = new ReplSetTest({nodes: 2});
rst.startSet();
rst.initiate();
testFn(new Mongo(rst.getURL()).getDB(dbName));
rst.stopSet();
jsTestLog("Testing against a sharded cluster");
const st = new ShardingTest({shards: 1, rs: {nodes: 2}});
testFn(st.s.getDB(dbName));
st.stop();
}());
|