summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/out_merge_on_secondary_metadata.js
blob: 9668c16fd411a709ffed8a8f794e0df14fb1c3d8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/**
 * Test that the impersonated user metadata, client metadata, and cluster time are propagated to the
 * primary when a $out/$merge is executed on a secondary.
 *
 * @tags: [
 *   assumes_unsharded_collection,
 *   requires_replication,
 * ]
 */
(function() {
"use strict";

load("jstests/libs/fail_point_util.js");

const replTest = new ReplSetTest({nodes: 2});
replTest.startSet();
replTest.initiate();
replTest.awaitReplication();

const primary = replTest.getPrimary();
const secondary = replTest.getSecondary();
const primaryDB = primary.getDB("test");
const secondaryDB = secondary.getDB("test");
assert.commandWorked(primaryDB.setProfilingLevel(2));
assert.commandWorked(secondaryDB.setProfilingLevel(2));
secondaryDB.getMongo().setReadPref("secondary");

const outCollName = "outColl";
const inputCollPrimary = primaryDB.getCollection("inputColl");

assert.commandWorked(inputCollPrimary.insert({_id: 0, a: 1}, {writeConcern: {w: 2}}));
assert.commandWorked(inputCollPrimary.insert({_id: 1, a: 2}, {writeConcern: {w: 2}}));

assert.commandWorked(
    primaryDB.runCommand({createUser: "testUser", pwd: "pwd", roles: [], writeConcern: {w: 2}}));
primaryDB.grantRolesToUser("testUser", [{role: "readWrite", db: "test"}], {w: 2});
const expectedUserAndRoleMetadata = {
    $impersonatedUser: {"user": "testUser", "db": "test"},
    $impersonatedRoles: [{"role": "readWrite", "db": "test"}]
};

function testMetadata(pipeline, comment) {
    configureFailPoint(primary, "hangDuringBatchInsert");

    const runAggregate = `
            const testDB = db.getSiblingDB("test");
            assert.eq(testDB.auth("testUser", "pwd"), 1);
            const res = testDB.runCommand({
                aggregate: "inputColl",
                pipeline: ${tojson(pipeline)},
                writeConcern: {w: 2},
                comment: "${comment}",
                cursor: {},
                $readPreference: {mode: "secondary"}
            });
            assert.commandWorked(res);
        `;
    let awaitShell = startParallelShell(runAggregate, secondary.port);

    // Get the client metadata and cluster time values from the secondary that we expect to be
    // propagated to the primary.
    let expectedClientMetadata = {};
    let secondaryClusterTime = {};
    assert.soon(() => {
        const curOps =
            secondary.getDB("admin")
                .aggregate([{$currentOp: {allUsers: true}}, {$match: {"command.comment": comment}}])
                .toArray();
        if (curOps.length === 0) {
            return false;
        }
        assert.eq(curOps.length, 1);
        expectedClientMetadata = curOps[0].clientMetadata;
        secondaryClusterTime = curOps[0].command.$clusterTime.clusterTime;
        return true;
    });

    assert.commandWorked(
        primary.adminCommand({configureFailPoint: "hangDuringBatchInsert", mode: "off"}));
    awaitShell();

    // Assert that the impersonated user metadata, client metadata, and cluster time are propagated
    // to the primary.
    const profile =
        primaryDB.system.profile.find({"command.comment": comment}).hint({$natural: 1}).toArray();
    let prevClusterTime = undefined;
    profile.forEach(op => {
        assert.eq(op.command.$audit, expectedUserAndRoleMetadata);
        assert.eq(op.command.$client, expectedClientMetadata);

        // If this was a $out, then there will be multiple entries due to the temporary collection
        // creation and rename. The first entry should have at least the cluster time that was
        // propagated from the secondary, while each subsequent entry should have a strictly
        // increasing cluster time.
        if (prevClusterTime === undefined) {
            assert.gte(op.command.$clusterTime.clusterTime, secondaryClusterTime);
        } else {
            assert.gt(op.command.$clusterTime.clusterTime, prevClusterTime);
        }
        prevClusterTime = op.command.$clusterTime.clusterTime;
    });

    // Drop the output collection to ensure that the drop replicates to the secondary. This
    // ensures that any state created by this test is cleaned up in preparation for later test
    // cases.
    let outColl = primaryDB.getCollection(outCollName);
    outColl.drop({writeConcern: {w: 2}});
}

const mergePipeline =
    [{$merge: {into: outCollName, whenMatched: "fail", whenNotMatched: "insert"}}];
testMetadata(mergePipeline, "merge_on_secondary_metadata");

const outPipeline = [{$group: {_id: "$_id", sum: {$sum: "$a"}}}, {$out: outCollName}];
testMetadata(outPipeline, "out_on_secondary_metadata");

replTest.stopSet();
})();