summaryrefslogtreecommitdiff
path: root/jstests/sharding/explain_agg_read_pref.js
blob: 0e774e4d8a8d8fc394909212e3d68154a3d1bbb7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
/**
 * Tests that readPref applies on an explain for an aggregation command.
 */
(function() {
"use strict";

load("jstests/libs/profiler.js");  // For profilerHasAtLeastOneMatchingEntryOrThrow.

const st = new ShardingTest({
    name: "agg_explain_readPref",
    shards: 2,
    other: {
        rs0: {
            nodes: [
                {rsConfig: {priority: 1, tags: {"tag": "primary"}}},
                {rsConfig: {priority: 0, tags: {"tag": "secondary"}}}
            ]
        },
        rs1: {
            nodes: [
                {rsConfig: {priority: 1, tags: {"tag": "primary"}}},
                {rsConfig: {priority: 0, tags: {"tag": "secondary"}}}
            ]
        },
        enableBalancer: false
    }
});

const mongos = st.s;
const config = mongos.getDB("config");
const mongosDB = mongos.getDB("agg_explain_readPref");
assert.commandWorked(mongosDB.dropDatabase());

const coll = mongosDB.getCollection("coll");

assert.commandWorked(config.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), "agg_explain_readPref-rs0");
const rs0Primary = st.rs0.getPrimary();
const rs0Secondary = st.rs0.getSecondary();
const rs1Primary = st.rs1.getPrimary();
const rs1Secondary = st.rs1.getSecondary();

for (let i = 0; i < 10; ++i) {
    assert.writeOK(coll.insert({a: i}));
}

//
// Confirms that aggregations with explain run against mongos are executed against a tagged
// secondary or primary, as per readPreference setting.
//
function confirmReadPreference(primary, secondary) {
    assert.commandWorked(secondary.setProfilingLevel(2));
    assert.commandWorked(primary.setProfilingLevel(2));

    // [<pref>, <tags>, <target>, <comment>]
    [['primary', [{}], primary, "primary"],
     ['primaryPreferred', [{tag: 'secondary'}], primary, "primaryPreferred"],
     ['secondary', [{}], secondary, "secondary"],
     ['secondary', [{tag: 'secondary'}], secondary, "secondaryTag"],
     ['secondaryPreferred', [{tag: 'secondary'}], secondary, "secondaryPreferred"],
     ['secondaryPreferred', [{tag: 'primary'}], primary, "secondaryPreferredTagPrimary"]]
        .forEach(function(args) {
            const pref = args[0], tagSets = args[1], target = args[2], name = args[3];

            //
            // Tests that explain within an aggregate command and an explicit $readPreference
            // targets the correct node in the replica set given by 'target'.
            //
            let comment = name + "_explain_within_query";
            assert.commandWorked(mongosDB.runCommand({
                query:
                    {aggregate: "coll", pipeline: [], comment: comment, cursor: {}, explain: true},
                $readPreference: {mode: pref, tags: tagSets}
            }));

            // Look for an operation without an exception, since the shard throws a stale config
            // exception if the shard or mongos has stale routing metadata, and the operation
            // gets retried.
            // Note, we look for *at least one* (not exactly one) matching entry: Mongos cancels
            // requests to all shards on receiving a stale version error from any shard.
            // However, the requests may have reached the other shards before they are canceled.
            // If the other shards were already fresh, they will re-receive the request in the
            // next attempt, meaning the request can show up more than once in the profiler.
            profilerHasAtLeastOneMatchingEntryOrThrow({
                profileDB: target,
                filter: {
                    "ns": coll.getFullName(),
                    "command.explain.aggregate": coll.getName(),
                    "command.explain.comment": comment,
                    "command.$readPreference.mode": pref == 'primary' ? null : pref,
                    "errMsg": {"$exists": false}
                }
            });

            //
            // Tests that an aggregation command wrapped in an explain with explicit
            // $queryOptions targets the correct node in the replica set given by 'target'.
            //
            comment = name + "_explain_wrapped_agg";
            assert.commandWorked(mongosDB.runCommand({
                $query: {
                    explain: {
                        aggregate: "coll",
                        pipeline: [],
                        comment: comment,
                        cursor: {},
                    }
                },
                $readPreference: {mode: pref, tags: tagSets}
            }));

            // Look for an operation without an exception, since the shard throws a stale config
            // exception if the shard or mongos has stale routing metadata, and the operation
            // gets retried.
            // Note, we look for *at least one* (not exactly one) matching entry: Mongos cancels
            // requests to all shards on receiving a stale version error from any shard.
            // However, the requests may have reached the other shards before they are canceled.
            // If the other shards were already fresh, they will re-receive the request in the
            // next attempt, meaning the request can show up more than once in the profiler.
            profilerHasAtLeastOneMatchingEntryOrThrow({
                profileDB: target,
                filter: {
                    "ns": coll.getFullName(),
                    "command.explain.aggregate": coll.getName(),
                    "command.explain.comment": comment,
                    "command.$readPreference.mode": pref == 'primary' ? null : pref,
                    "errMsg": {"$exists": false}
                }
            });
        });

    assert.commandWorked(secondary.setProfilingLevel(0));
    assert.commandWorked(primary.setProfilingLevel(0));
}

//
// Test aggregate explains run against an unsharded collection.
//
confirmReadPreference(rs0Primary.getDB(mongosDB.getName()), rs0Secondary.getDB(mongosDB.getName()));

//
// Test aggregate explains run against a sharded collection.
//
assert.commandWorked(coll.createIndex({a: 1}));
assert.commandWorked(config.adminCommand({shardCollection: coll.getFullName(), key: {a: 1}}));
assert.commandWorked(mongos.adminCommand({split: coll.getFullName(), middle: {a: 6}}));
assert.commandWorked(mongosDB.adminCommand(
    {moveChunk: coll.getFullName(), find: {a: 25}, to: "agg_explain_readPref-rs1"}));

// Sharded tests are run against the non-primary shard for the "agg_explain_readPref" db.
confirmReadPreference(rs1Primary.getDB(mongosDB.getName()), rs1Secondary.getDB(mongosDB.getName()));

st.stop();
})();