From d0aee5104eee9ee336b1ed1a298484c75c58d3bb Mon Sep 17 00:00:00 2001 From: Arun Banala Date: Tue, 28 Jan 2020 21:34:41 +0000 Subject: SERVER-45555 Test that the 'comment' field is propagated correctly for sharded $unionWith --- jstests/sharding/comment_field.js | 169 +++++++++++++++++++++++++++++++++----- 1 file changed, 149 insertions(+), 20 deletions(-) diff --git a/jstests/sharding/comment_field.js b/jstests/sharding/comment_field.js index 236e760c4fb..293839e6db8 100644 --- a/jstests/sharding/comment_field.js +++ b/jstests/sharding/comment_field.js @@ -63,8 +63,15 @@ function setPostCommandFailpointOnShards({mode, options}) { }); } -function runCommentParamTest( - {coll, command, expectedRunningOps, commentObj, cmdName, parallelFunction}) { +function runCommentParamTest({ + coll, + command, + expectedRunningOps, + expectedProfilerEntries, + commentObj, + cmdName, + parallelFunction +}) { if (!cmdName) { cmdName = Object.keys(command)[0]; } @@ -98,6 +105,18 @@ function runCommentParamTest( assert.commandWorked(cmdRes); `; } + // Force a refresh on the shards. This is necessary because MongoS could get StaleDbVersion + // error upon sending an agg request, causing it to retry the agg command from the top and + // resulting in more profiler entries than what is expected. + assert.commandWorked(st.rs0.getPrimary().getDB(testDB.getName()).adminCommand({ + _flushDatabaseCacheUpdates: testDB.getName(), + syncFromConfig: true + })); + assert.commandWorked(st.rs1.getPrimary().getDB(testDB.getName()).adminCommand({ + _flushDatabaseCacheUpdates: testDB.getName(), + syncFromConfig: true + })); + // Run the 'command' in a parallel shell. let outShell = startParallelShell(parallelFunction, st.s.port); @@ -121,22 +140,20 @@ function runCommentParamTest( .aggregate([{$currentOp: {localOps: true}}, {$match: filter}]) .toArray() .length, - 1); + 1, + testDB.getSiblingDB("admin").aggregate([{$currentOp: {localOps: true}}]).toArray()); // Unset the failpoint to unblock the command and join with the parallel shell. setPostCommandFailpointOnShards({mode: "off", options: {}}); outShell(); - // For 'update' and 'delete' commands log lines and profiler entries are added for each - // sub-operation. - let expectedAdditionalEntries = (cmdName === "update") ? command["updates"].length : 0; - expectedAdditionalEntries += (cmdName === "delete") ? command["deletes"].length : 0; - // Verify that profile entry has 'comment' field. + expectedProfilerEntries = + expectedProfilerEntries ? expectedProfilerEntries : expectedRunningOps; const profileFilter = {"command.comment": commentObj}; assert.eq(shard0DB.system.profile.find(profileFilter).itcount() + shard1DB.system.profile.find(profileFilter).itcount(), - (expectedAdditionalEntries > 0) ? expectedAdditionalEntries : expectedRunningOps, + expectedProfilerEntries, () => tojson({ [st.shard0.name]: shard0DB.system.profile.find().toArray(), [st.shard1.name]: shard1DB.system.profile.find().toArray() @@ -145,8 +162,6 @@ function runCommentParamTest( // Run the 'checkLog' only for commands with uuid so that the we know the log line belongs to // current operation. if (commentObj["uuid"]) { - let expectedLogLines = expectedRunningOps + 1; // +1 for log line on mongos. - expectedLogLines += expectedAdditionalEntries; verifyLogContains( [testDB, shard0DB, shard1DB], // Verify that a field with 'comment' exists in the same line as the command. @@ -155,7 +170,14 @@ function runCommentParamTest( checkLog.formatAsLogLine(commentObj), 'appName: "MongoDB Shell" command: ' + ((cmdName === "getMore") ? cmdName : "") ], - expectedLogLines); + + ((cmdName === "update" || cmdName === "delete") + ? expectedRunningOps + : 0) + // For 'update' and 'delete' commands we also log an additional line + // for the entire operation. + expectedProfilerEntries + + 1 // +1 to account for log line on mongos. + ); } } @@ -198,7 +220,8 @@ runCommentParamTest({ expectedRunningOps: 1 }); -// For update command on a sharded collection, when all the shards are targetted. +// For update command on a sharded collection, when all the shards are targetted. For update command +// profiler entries are only added for each sub-operation. runCommentParamTest({ coll: shardedColl, command: { @@ -210,14 +233,16 @@ runCommentParamTest({ ], ordered: false }, - expectedRunningOps: 2 + expectedRunningOps: 2, + expectedProfilerEntries: 3 }); // For update command on a sharded collection, where a single shard is targetted. runCommentParamTest({ coll: shardedColl, command: {update: shardedColl.getName(), updates: [{q: {x: 3}, u: {x: 3, a: 1}}]}, - expectedRunningOps: 1 + expectedRunningOps: 1, + expectedProfilerEntries: 1 }); // For update command on an unsharded collection, where only primary shard is targetted. @@ -227,10 +252,12 @@ runCommentParamTest({ update: unshardedColl.getName(), updates: [{q: {x: 1}, u: {x: 1}}, {q: {x: -1}, u: {x: -1, a: 1}}] }, - expectedRunningOps: 1 + expectedRunningOps: 1, + expectedProfilerEntries: 2 }); -// For delete command on a sharded collection, where all the shards are targetted. +// For delete command on a sharded collection, where all the shards are targetted. For delete +// command profiler entries are only added for each sub-operation. runCommentParamTest({ coll: shardedColl, command: { @@ -238,7 +265,8 @@ runCommentParamTest({ deletes: [{q: {x: 3}, limit: 0}, {q: {x: -3}, limit: 0}, {q: {x: -1}, limit: 0}], ordered: false }, - expectedRunningOps: 2 + expectedRunningOps: 2, + expectedProfilerEntries: 3 }); // For delete command on a sharded collection, where a single shard is targetted. @@ -246,7 +274,8 @@ runCommentParamTest({ coll: shardedColl, command: {delete: shardedColl.getName(), deletes: [{q: {x: 1}, limit: 0}, {q: {x: 3}, limit: 0}]}, - expectedRunningOps: 1 + expectedRunningOps: 1, + expectedProfilerEntries: 2 }); // For delete command on an unsharded collection, where only primary shard is targetted. @@ -256,7 +285,8 @@ runCommentParamTest({ delete: unshardedColl.getName(), deletes: [{q: {_id: 1}, limit: 0}, {q: {_id: 0}, limit: 0}] }, - expectedRunningOps: 1 + expectedRunningOps: 1, + expectedProfilerEntries: 2 }); // For createIndexes command on a sharded collection, where all the shards are targetted. @@ -483,6 +513,105 @@ runCommentParamTest({ commentObj: comment }); +// +// Tests for '$unionWith' stage as part of aggregate on a sharded collection. +// + +// For aggregate command with a $unionWith stage, where a sharded collection unions with a sharded +// collection, each shard recieves an aggregate operation for the outer pipeline (with the stages +// prior to the $unionWith stage) and the inner pipeline. Each aggregate operation is followed up by +// a getMore to exhaust the cursor. So there should be 8 profiler entries which has the 'comment' +// field. In addition there is an aggregate operation which does merge cursors. +runCommentParamTest({ + coll: shardedColl, + command: { + aggregate: shardedColl.getName(), + pipeline: [ + {$match: {p: null}}, + {$unionWith: {coll: shardedColl.getName(), pipeline: [{$group: {_id: "$x"}}]}} + ], + cursor: {} + }, + expectedRunningOps: 2, + expectedProfilerEntries: 9 +}); + +// For aggregate command with a $unionWith stage, where a sharded collection unions with an +// unsharded collection, each shard recieves an aggregate & getMore operation (with the stages prior +// to the $unionWith stage) for the outer pipeline and 1 aggregate operation for the inner pipeline +// on unsharded collection. So there should be 5 profiler entries which has the 'comment' field. In +// addition there is an aggregate operation which does merge cursors. +runCommentParamTest({ + coll: shardedColl, + command: { + aggregate: shardedColl.getName(), + pipeline: [ + {$match: {p: null}}, + {$unionWith: {coll: unshardedColl.getName(), pipeline: [{$group: {_id: "$x"}}]}} + ], + cursor: {} + }, + expectedRunningOps: 2, + expectedProfilerEntries: 6 +}); + +// For aggregate command with a $unionWith stage, where an unsharded collection unions with a +// sharded collection, each shard recieves an aggregate & getMore operation for the inner pipeline. +// So there should be 4 profiler entries which has the 'comment' field. In addition there is an +// aggregate operation which does merge cursors. +runCommentParamTest({ + coll: unshardedColl, + command: { + aggregate: unshardedColl.getName(), + pipeline: [{$unionWith: shardedColl.getName()}], + cursor: {} + }, + expectedRunningOps: 1, + expectedProfilerEntries: 5 +}); + +// For aggregate command with a $unionWith stage, where a sharded collection unions with a sharded +// collection. When the batch size is 0, no getMore commands should be issued. In this case, the +// outer pipeline would still be split. +runCommentParamTest({ + coll: shardedColl, + command: { + aggregate: shardedColl.getName(), + pipeline: [{$unionWith: shardedColl.getName()}], + cursor: {batchSize: 0} + }, + expectedRunningOps: 2, + expectedProfilerEntries: + 3 // Additional profiler entry for the pipeline that executes merge cursors. +}); + +// For aggregate command with a $unionWith stage, where a sharded collection unions with an +// unsharded collection. When the batch size is 0, no getMore commands should be issued. In this +// case, the outer pipeline would still be split. +runCommentParamTest({ + coll: shardedColl, + command: { + aggregate: shardedColl.getName(), + pipeline: [{$unionWith: unshardedColl.getName()}], + cursor: {batchSize: 0} + }, + expectedRunningOps: 2, + expectedProfilerEntries: + 3 // Additional profiler entry for the pipeline that executes merge cursors. +}); + +// For aggregate command with a $unionWith stage, where an unsharded collection unions with a +// sharded collection. When the batch size is 0, no getMore commands should be issued. +runCommentParamTest({ + coll: unshardedColl, + command: { + aggregate: unshardedColl.getName(), + pipeline: [{$unionWith: shardedColl.getName()}], + cursor: {batchSize: 0} + }, + expectedRunningOps: 1 +}); + // // Tests for Legacy query. // -- cgit v1.2.1