summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/set_window_fields_read_concern_snapshot.js
blob: b8277043293a1e0ce3ca18686a381be8bca6976b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
/**
 * Test that $setWindowFields succeeds if it needs to spill to disk with readConcern snapshot and in
 * transactions.
 * @tags: [
 *   requires_replication,
 *   uses_transactions,
 *   uses_snapshot_read_concern,
 * ]
 */
(function() {
"use strict";

load("jstests/noPassthrough/libs/server_parameter_helpers.js");  // For setParameterOnAllHosts.
load("jstests/libs/discover_topology.js");                       // For findNonConfigNodes.
load("jstests/aggregation/extras/utils.js");                     // arrayEq.
load("jstests/libs/profiler.js");                                // getLatestProfileEntry.

const rst = new ReplSetTest({nodes: 2});
rst.startSet();
rst.initiate();
const rstPrimary = rst.getPrimary();
const testDB = rstPrimary.getDB(jsTestName() + "_db");
const coll = testDB[jsTestName() + "_coll"];
coll.drop();

function checkProfilerForDiskWrite(dbToCheck) {
    const profileObj = getLatestProfilerEntry(dbToCheck, {usedDisk: true});
    // Verify that this was a $setWindowFields stage as expected.
    if (profileObj.hasOwnProperty("originatingCommand")) {
        const firstStage = profileObj.originatingCommand.pipeline[0];
        assert(firstStage.hasOwnProperty("$setWindowFields") ||
               firstStage.hasOwnProperty("$lookup"));
    } else if (profileObj.hasOwnProperty("command")) {
        const firstStage = profileObj.command.pipeline[0];
        assert(firstStage.hasOwnProperty("$setWindowFields") ||
               firstStage.hasOwnProperty("$lookup"));
    } else {
        assert(false, "Profiler should have had command field", profileObj);
    }
}
const documents = [];
for (let i = 0; i < 30; i++) {
    documents.push({_id: i, val: i, partition: 1});
    documents.push({_id: i + 30, val: i, partition: 2});
}
assert.commandWorked(coll.insert(documents));

setParameterOnAllHosts(DiscoverTopology.findNonConfigNodes(testDB.getMongo()),
                       "internalDocumentSourceSetWindowFieldsMaxMemoryBytes",
                       1500);
const rsStatus = rst.status();
const lastClusterTime = rsStatus.optimes.lastCommittedOpTime.ts;
const lowerBound = -21;
const upperBound = 21;
let pipeline = [
    {
        $setWindowFields: {
            partitionBy: "$partition",
            sortBy: {partition: 1},
            output: {sum: {$sum: "$val", window: {documents: [lowerBound, upperBound]}}}
        }
    },
    {$sort: {val: 1}},
];
let aggregationCommand = {
    aggregate: coll.getName(),
    pipeline: pipeline,
    allowDiskUse: true,
    readConcern: {level: "snapshot", atClusterTime: lastClusterTime},
    cursor: {}
};

function resetProfiler() {
    testDB.setProfilingLevel(0);
    testDB.system.profile.drop();
    testDB.setProfilingLevel(2);
}

// Run outside of a transaction.
resetProfiler();
let commandResult = assert.commandWorked(testDB.runCommand(aggregationCommand));
checkProfilerForDiskWrite(testDB);
let arrayResult = commandResult.cursor.firstBatch;
let expected = [];

let curSum = (21) * (11);
for (let i = 0; i < 30; i++) {
    expected.push({_id: i, val: i, partition: 1, sum: curSum});
    expected.push({_id: i + 30, val: i, partition: 2, sum: curSum});
    // Subtract the beginning of the window. Add because the lowerBound is negative.
    curSum = curSum - Math.max(0, i + lowerBound);
    // Add the end of the window.
    if (i < 29 - upperBound) {
        curSum = curSum + i + upperBound + 1;
    }
}
assertArrayEq({actual: arrayResult, expected: expected});

// Make sure that a $setWindowFields in a subpipeline with readConcern snapshot succeeds.
const lookupPipeline = [{$lookup: {from: coll.getName(), pipeline: pipeline, as: "newField"}}];
aggregationCommand = {
    aggregate: coll.getName(),
    pipeline: lookupPipeline,
    allowDiskUse: true,
    readConcern: {level: "snapshot", atClusterTime: lastClusterTime},
    cursor: {}
};
// We're running the same setWindowFields multiple times. Just check if the command doesn't
// crash the server instead of checking results from here on out.
assert.commandWorked(testDB.runCommand(aggregationCommand));

// Repeat in a transaction. Don't check for disk writes, as can't query the profiler in a
// transaction.
let session = rstPrimary.startSession();
session.startTransaction({readConcern: {level: "snapshot"}});
const sessionDB = session.getDatabase(testDB.getName());
const sessionColl = sessionDB.getCollection(coll.getName());
aggregationCommand = {
    aggregate: coll.getName(),
    pipeline: pipeline,
    allowDiskUse: true,
    cursor: {},
};
assert.commandWorked(sessionColl.runCommand(aggregationCommand));
// Restart transaction.
session.abortTransaction();
session.startTransaction({readConcern: {level: "snapshot"}});
// Repeat the subpipeline test in a transaction.
aggregationCommand = {
    aggregate: coll.getName(),
    pipeline: lookupPipeline,
    allowDiskUse: true,
    cursor: {}
};
assert.commandWorked(sessionColl.runCommand(aggregationCommand));
session.abortTransaction();
rst.stopSet();
})();