1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
|
// Tests that aggregations with a $merge stage respect the options set on the command.
(function() {
'use strict';
load("jstests/libs/profiler.js"); // For profilerHasNumMatchingEntriesOrThrow.
const st = new ShardingTest({shards: 2, rs: {nodes: 2}});
const mongosDB = st.s0.getDB("test");
const source = mongosDB.getCollection("source");
const target = mongosDB.getCollection("target");
const primaryDB = st.rs0.getPrimary().getDB("test");
const nonPrimaryDB = st.rs1.getPrimary().getDB("test");
const maxTimeMS = 5 * 60 * 1000;
// Enable profiling on the test DB.
assert.commandWorked(primaryDB.setProfilingLevel(2));
assert.commandWorked(nonPrimaryDB.setProfilingLevel(2));
// Enable sharding on the test DB and ensure that shard0 is the primary.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
// Force a refresh on rs0. This is necessary because MongoS will get StaleDbVersion upon sending
// the agg request below, causing it to retry the agg command from the top and thus send
// listIndexes to the primary shard twice.
assert.commandWorked(
st.rs0.getPrimary().getDB('test').adminCommand({_flushDatabaseCacheUpdates: 'test'}));
// Shard the target collection, and set the unique flag to ensure that there's a unique
// index on the shard key.
const shardKey = {sk: 1};
assert.commandWorked(mongosDB.adminCommand(
{shardCollection: target.getFullName(), key: shardKey, unique: true}));
assert.commandWorked(mongosDB.adminCommand({split: target.getFullName(), middle: {sk: 1}}));
assert.commandWorked(mongosDB.adminCommand(
{moveChunk: target.getFullName(), find: {sk: 1}, to: st.rs1.getURL()}));
assert.commandWorked(source.insert({sk: "dummy"}));
// The shardCollection command will send a listIndexes on the target collection.
profilerHasNumMatchingEntriesOrThrow({
profileDB: primaryDB,
filter: {ns: target.getFullName(), "command.listIndexes": target.getName()},
numExpectedMatches: 1
});
// Test that the maxTimeMS value is used for both the listIndexes command for uniqueKey
// validation as well as the $merge aggregation itself.
(function testMaxTimeMS() {
assert.commandWorked(source.runCommand("aggregate", {
pipeline: [{
$merge: {
into: target.getName(),
whenMatched: "replace",
whenNotMatched: "insert",
on: Object.keys(shardKey)
}
}],
cursor: {},
maxTimeMS: maxTimeMS
}));
// Verify the profile entry for the aggregate on the source collection.
profilerHasNumMatchingEntriesOrThrow({
profileDB: primaryDB,
filter: {
ns: source.getFullName(),
"command.aggregate": source.getName(),
"command.maxTimeMS": maxTimeMS
},
numExpectedMatches: 1
});
// The listIndexes command should be sent to the primary shard only. Note that the
// maxTimeMS will *not* show up in the profiler since the parameter is used as a timeout for
// the remote command vs. part of the command itself.
profilerHasNumMatchingEntriesOrThrow({
profileDB: primaryDB,
filter: {ns: target.getFullName(), "command.listIndexes": target.getName()},
numExpectedMatches: 2
});
})();
(function testTimeout() {
// Configure the "maxTimeAlwaysTimeOut" fail point on the primary shard, which forces
// mongod to throw if it receives an operation with a max time.
assert.commandWorked(primaryDB.getSiblingDB("admin").runCommand(
{configureFailPoint: "maxTimeAlwaysTimeOut", mode: "alwaysOn"}));
// Test that the $merge correctly fails when the maxTimeMS is exceeded.
const res = source.runCommand("aggregate", {
pipeline: [{
$merge: {
into: target.getName(),
whenMatched: "replace",
whenNotMatched: "insert",
on: Object.keys(shardKey)
}
}],
cursor: {},
maxTimeMS: maxTimeMS
});
assert.commandFailedWithCode(
res,
ErrorCodes.MaxTimeMSExpired,
"expected aggregate to fail with code " + ErrorCodes.MaxTimeMSExpired +
" due to maxTimeAlwaysTimeOut fail point, but instead got: " + tojson(res));
// The actual aggregate should not be in the profiler since the initial listIndexes should
// have timed out.
profilerHasNumMatchingEntriesOrThrow({
profileDB: primaryDB,
filter: {
ns: source.getFullName(),
"command.aggregate": source.getName(),
"command.maxTimeMS": maxTimeMS
},
numExpectedMatches: 1
});
// Verify that there is an additional listIndexes profiler entry on the primary shard.
profilerHasNumMatchingEntriesOrThrow({
profileDB: primaryDB,
filter: {ns: target.getFullName(), "command.listIndexes": target.getName()},
numExpectedMatches: 3
});
assert.commandWorked(primaryDB.getSiblingDB("admin").runCommand(
{configureFailPoint: "maxTimeAlwaysTimeOut", mode: "off"}));
})();
// Test that setting a read preference on the $merge also applies to the listIndexes
// command.
(function testReadPreference() {
const secondaryDB = st.rs0.getSecondary().getDB("test");
assert.commandWorked(secondaryDB.setProfilingLevel(2));
assert.commandWorked(source.runCommand("aggregate", {
pipeline: [{
$merge: {
into: target.getName(),
whenMatched: "replace",
whenNotMatched: "insert",
on: Object.keys(shardKey)
}
}],
cursor: {},
$readPreference: {mode: "secondary"}
}));
// Verify that the profiler on the secondary includes an entry for the listIndexes.
profilerHasNumMatchingEntriesOrThrow({
profileDB: secondaryDB,
filter: {ns: target.getFullName(), "command.listIndexes": target.getName()},
numExpectedMatches: 1
});
// Verify that the primary shard does *not* have an additional listIndexes profiler entry.
profilerHasNumMatchingEntriesOrThrow({
profileDB: primaryDB,
filter: {ns: target.getFullName(), "command.listIndexes": target.getName()},
numExpectedMatches: 3
});
profilerHasNumMatchingEntriesOrThrow({
profileDB: secondaryDB,
filter: {
ns: source.getFullName(),
"command.aggregate": source.getName(),
"command.$readPreference": {mode: "secondary"},
},
numExpectedMatches: 1
});
// Test that $out cannot be run against a secondary since it writes directly to a local temp
// collection.
assert.commandFailedWithCode(source.runCommand("aggregate", {
pipeline: [{$out: "non_existent"}],
cursor: {},
$readPreference: {mode: "secondary"}
}),
16994,
"Expected $out to fail to create the temp collection.");
})();
st.stop();
})();
|