1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
/**
* Tests command output from the $operationMetrics aggregation stage.
* @tags: [
* requires_fcv_63,
* requires_replication,
* ]
*/
(function() {
'use strict';
load("jstests/libs/os_helpers.js"); // For isLinux().
var rst = new ReplSetTest({
nodes: 2,
nodeOptions: {setParameter: {"aggregateOperationResourceConsumptionMetrics": true}}
});
rst.startSet();
rst.initiate();
let assertMetricsExist = function(metrics) {
try {
assert.neq(metrics, undefined);
assert(metrics.hasOwnProperty("db"));
assert(metrics.hasOwnProperty("localTime"));
let primaryMetrics = metrics.primaryMetrics;
let secondaryMetrics = metrics.secondaryMetrics;
[primaryMetrics, secondaryMetrics].forEach((readMetrics) => {
assert.gte(readMetrics.docBytesRead, 0);
assert.gte(readMetrics.docUnitsRead, 0);
assert.gte(readMetrics.idxEntryBytesRead, 0);
assert.gte(readMetrics.idxEntryUnitsRead, 0);
assert.gte(readMetrics.keysSorted, 0);
assert.gte(readMetrics.sorterSpills, 0);
assert.gte(readMetrics.docUnitsReturned, 0);
assert.gte(readMetrics.cursorSeeks, 0);
});
assert.gte(metrics.cpuNanos, 0);
assert.gte(metrics.docBytesWritten, 0);
assert.gte(metrics.docUnitsWritten, 0);
assert.gte(metrics.idxEntryBytesWritten, 0);
assert.gte(metrics.idxEntryUnitsWritten, 0);
assert.gte(metrics.totalUnitsWritten, 0);
} catch (e) {
print("caught exception while checking metrics output: " + tojson(metrics));
throw e;
}
};
let getDBMetrics = (adminDB) => {
let cursor = adminDB.aggregate([{$operationMetrics: {}}]);
// Merge all returned documents into a single object keyed by database name.
let allMetrics = {};
while (cursor.hasNext()) {
let doc = cursor.next();
allMetrics[doc.db] = doc;
}
return allMetrics;
};
let getServerStatusMetrics = (db) => {
let ss = db.serverStatus();
assert(ss.hasOwnProperty('resourceConsumption'), ss);
return ss.resourceConsumption;
};
const primary = rst.getPrimary();
// $operationMetrics may only be run against the admin database and in a 'collectionless' form.
assert.commandFailedWithCode(primary.getDB('invalid').runCommand({
aggregate: 1,
pipeline: [{$operationMetrics: {}}],
cursor: {},
}),
ErrorCodes.InvalidNamespace);
assert.commandFailedWithCode(primary.getDB('admin').runCommand({
aggregate: 'test',
pipeline: [{$operationMetrics: {}}],
cursor: {},
}),
ErrorCodes.InvalidNamespace);
// Perform very basic reads and writes on two different databases.
const db1Name = 'db1';
const db1 = primary.getDB(db1Name);
assert.commandWorked(db1.coll1.insert({a: 1}));
assert.commandWorked(db1.coll2.insert({a: 1}));
const db2Name = 'db2';
const db2 = primary.getDB(db2Name);
assert.commandWorked(db2.coll1.insert({a: 1}));
assert.commandWorked(db2.coll2.insert({a: 1}));
const secondary = rst.getSecondary();
[primary, secondary].forEach(function(node) {
jsTestLog("Testing node: " + node);
// Clear metrics after waiting for replication to ensure we are not observing metrics from
// a previous loop iteration.
rst.awaitReplication();
const adminDB = node.getDB('admin');
let initialCpuTime = getServerStatusMetrics(adminDB).cpuNanos;
adminDB.aggregate([{$operationMetrics: {clearMetrics: true}}]);
assert.eq(node.getDB(db1Name).coll1.find({a: 1}).itcount(), 1);
assert.eq(node.getDB(db1Name).coll2.find({a: 1}).itcount(), 1);
assert.eq(node.getDB(db2Name).coll1.find({a: 1}).itcount(), 1);
assert.eq(node.getDB(db2Name).coll2.find({a: 1}).itcount(), 1);
// Run an aggregation with a batch size of 1.
let cursor = adminDB.aggregate([{$operationMetrics: {}}], {cursor: {batchSize: 1}});
assert(cursor.hasNext());
// Merge all returned documents into a single object keyed by database name.
let allMetrics = {};
let doc = cursor.next();
allMetrics[doc.db] = doc;
assert.eq(cursor.objsLeftInBatch(), 0);
// Trigger a getMore to retrieve metrics for the other database.
assert(cursor.hasNext());
doc = cursor.next();
allMetrics[doc.db] = doc;
assert(!cursor.hasNext());
// Ensure the two user database have present metrics.
assertMetricsExist(allMetrics[db1Name]);
assertMetricsExist(allMetrics[db2Name]);
let ssMetrics = getServerStatusMetrics(adminDB);
assert.eq(ssMetrics.numMetrics, 2);
assert.gt(ssMetrics.memUsage, 0);
// Ensure read metrics are attributed to the correct replication state.
let lastDocBytesRead;
if (node === primary) {
[db1Name, db2Name].forEach((db) => {
assert.gt(allMetrics[db].primaryMetrics.docBytesRead, 0);
assert.gt(allMetrics[db].primaryMetrics.docUnitsRead, 0);
assert.eq(allMetrics[db].primaryMetrics.cursorSeeks, 0);
assert.eq(allMetrics[db].secondaryMetrics.docBytesRead, 0);
assert.eq(allMetrics[db].secondaryMetrics.docUnitsRead, 0);
assert.eq(allMetrics[db].secondaryMetrics.cursorSeeks, 0);
});
assert.eq(allMetrics[db1Name].primaryMetrics.docBytesRead,
allMetrics[db2Name].primaryMetrics.docBytesRead);
lastDocBytesRead = allMetrics[db1Name].primaryMetrics.docBytesRead;
} else {
[db1Name, db2Name].forEach((db) => {
assert.gt(allMetrics[db].secondaryMetrics.docBytesRead, 0);
assert.gt(allMetrics[db].secondaryMetrics.docUnitsRead, 0);
assert.eq(allMetrics[db].secondaryMetrics.cursorSeeks, 0);
assert.eq(allMetrics[db].primaryMetrics.docBytesRead, 0);
assert.eq(allMetrics[db].primaryMetrics.docUnitsRead, 0);
assert.eq(allMetrics[db].primaryMetrics.cursorSeeks, 0);
});
assert.eq(allMetrics[db1Name].secondaryMetrics.docBytesRead,
allMetrics[db2Name].secondaryMetrics.docBytesRead);
lastDocBytesRead = allMetrics[db1Name].secondaryMetrics.docBytesRead;
}
// CPU time aggregation is only supported on Linux.
if (isLinux()) {
// Ensure the CPU time is increasing.
let lastCpuTime = getServerStatusMetrics(adminDB).cpuNanos;
assert.gt(lastCpuTime, initialCpuTime);
// Ensure the global CPU time matches the aggregated time for both databases.
assert.eq(lastCpuTime - initialCpuTime,
allMetrics[db1Name].cpuNanos + allMetrics[db2Name].cpuNanos);
}
// Metrics for these databases should not be collected or reported.
assert.eq(allMetrics['admin'], undefined);
assert.eq(allMetrics['local'], undefined);
assert.eq(allMetrics['config'], undefined);
// Ensure this stage can be composed with other pipeline stages.
const newDbName = "newDB";
const newCollName = "metrics_out";
cursor = adminDB.aggregate([
{$operationMetrics: {}},
{$project: {db: 1}},
{$out: {db: newDbName, coll: newCollName}},
]);
// No results from the aggregation because of the $out.
assert.eq(cursor.itcount(), 0);
// There are no additional metrics for the new database because the command was run on the
// 'admin' database and it does not collect metrics.
cursor = adminDB.aggregate([{$operationMetrics: {}}]);
assert.eq(cursor.itcount(), 2);
// Metrics should not have changed.
allMetrics = getDBMetrics(adminDB);
if (node === primary) {
assert.eq(allMetrics[db1Name].primaryMetrics.docBytesRead, lastDocBytesRead);
assert.eq(allMetrics[db2Name].primaryMetrics.docBytesRead, lastDocBytesRead);
} else {
assert.eq(allMetrics[db1Name].secondaryMetrics.docBytesRead, lastDocBytesRead);
assert.eq(allMetrics[db2Name].secondaryMetrics.docBytesRead, lastDocBytesRead);
}
// Ensure the output collection has the 2 databases that existed at the start of the operation.
rst.awaitReplication();
cursor = node.getDB(newDbName)[newCollName].find({});
assert.eq(cursor.itcount(), 2);
primary.getDB(newDbName).dropDatabase();
// Fetch and don't clear metrics.
cursor = adminDB.aggregate([{$operationMetrics: {clearMetrics: false}}]);
assert.eq(cursor.itcount(), 3);
// Fetch and clear metrics.
cursor = adminDB.aggregate([{$operationMetrics: {clearMetrics: true}}]);
assert.eq(cursor.itcount(), 3);
// Ensure no metrics are reported.
cursor = adminDB.aggregate([{$operationMetrics: {}}]);
assert.eq(cursor.itcount(), 0);
// Ensure the serverStatus metrics are cleared except for cpuNanos.
ssMetrics = getServerStatusMetrics(adminDB);
assert.eq(0, ssMetrics.numMetrics);
assert.eq(0, ssMetrics.memUsage);
if (isLinux()) {
assert.neq(0, ssMetrics.cpuNanos);
} else {
assert.eq(0, ssMetrics.cpuNanos);
}
// Insert something and ensure metrics are still reporting.
assert.commandWorked(db1.coll3.insert({a: 1}));
rst.awaitReplication();
// On the primary, this insert's metrics should be recorded, but not on the secondary. Since it
// is applied by the batch applier on the secondary, it is not a user operation and should not
// count toward any metrics.
cursor = adminDB.aggregate([{$operationMetrics: {}}]);
if (node === primary) {
assert.eq(cursor.itcount(), 1);
} else {
assert.eq(cursor.itcount(), 0);
}
db1.coll3.drop();
});
rst.stopSet();
}());
|