// Tests that aggregations with a $out stage respect the options set on the command. (function() { 'use strict'; load("jstests/libs/profiler.js"); // For profilerHasNumMatchingEntriesOrThrow. const st = new ShardingTest({shards: 2, rs: {nodes: 2}}); const mongosDB = st.s0.getDB("test"); const source = mongosDB.getCollection("source"); const target = mongosDB.getCollection("target"); const primaryDB = st.rs0.getPrimary().getDB("test"); const nonPrimaryDB = st.rs1.getPrimary().getDB("test"); const maxTimeMS = 5 * 60 * 1000; // Enable profiling on the test DB. assert.commandWorked(primaryDB.setProfilingLevel(2)); assert.commandWorked(nonPrimaryDB.setProfilingLevel(2)); // Enable sharding on the test DB and ensure that shard0 is the primary. assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); // Shard the target collection, and set the unique flag to ensure that there's a unique // index on the shard key. const shardKey = {sk: 1}; assert.commandWorked(mongosDB.adminCommand( {shardCollection: target.getFullName(), key: shardKey, unique: true})); assert.commandWorked(mongosDB.adminCommand({split: target.getFullName(), middle: {sk: 1}})); assert.commandWorked(mongosDB.adminCommand( {moveChunk: target.getFullName(), find: {sk: 1}, to: st.rs1.getURL()})); assert.commandWorked(source.insert({sk: "dummy"})); // The shardCollection command will send a listIndexes on the target collection. profilerHasNumMatchingEntriesOrThrow({ profileDB: primaryDB, filter: {ns: target.getFullName(), "command.listIndexes": target.getName()}, numExpectedMatches: 1 }); // Test that the maxTimeMS value is used for both the listIndexes command for uniqueKey // validation as well as the $out aggregation itself. (function testMaxTimeMS() { assert.commandWorked(source.runCommand("aggregate", { pipeline: [{$out: {to: target.getName(), mode: "replaceDocuments", uniqueKey: shardKey}}], cursor: {}, maxTimeMS: maxTimeMS })); // Verify the profile entry for the aggregate on the source collection. profilerHasNumMatchingEntriesOrThrow({ profileDB: primaryDB, filter: { ns: source.getFullName(), "command.aggregate": source.getName(), "command.maxTimeMS": maxTimeMS }, numExpectedMatches: 1 }); // The listIndexes command should be sent to the primary shard only. Note that the // maxTimeMS will *not* show up in the profiler since the parameter is used as a timeout for // the remote command vs. part of the command itself. profilerHasNumMatchingEntriesOrThrow({ profileDB: primaryDB, filter: {ns: target.getFullName(), "command.listIndexes": target.getName()}, numExpectedMatches: 2 }); })(); (function testTimeout() { // Configure the "maxTimeAlwaysTimeOut" fail point on the primary shard, which forces // mongod to throw if it receives an operation with a max time. assert.commandWorked(primaryDB.getSiblingDB("admin").runCommand( {configureFailPoint: "maxTimeAlwaysTimeOut", mode: "alwaysOn"})); // Test that the $out correctly fails when the maxTimeMS is exceeded. const res = source.runCommand("aggregate", { pipeline: [{$out: {to: target.getName(), mode: "replaceDocuments", uniqueKey: shardKey}}], cursor: {}, maxTimeMS: maxTimeMS }); assert.commandFailedWithCode( res, ErrorCodes.MaxTimeMSExpired, "expected aggregate to fail with code " + ErrorCodes.MaxTimeMSExpired + " due to maxTimeAlwaysTimeOut fail point, but instead got: " + tojson(res)); // The actual aggregate should not be in the profiler since the initial listIndexes should // have timed out. profilerHasNumMatchingEntriesOrThrow({ profileDB: primaryDB, filter: { ns: source.getFullName(), "command.aggregate": source.getName(), "command.maxTimeMS": maxTimeMS }, numExpectedMatches: 1 }); // Verify that there is an additional listIndexes profiler entry on the primary shard. profilerHasNumMatchingEntriesOrThrow({ profileDB: primaryDB, filter: {ns: target.getFullName(), "command.listIndexes": target.getName()}, numExpectedMatches: 3 }); assert.commandWorked(primaryDB.getSiblingDB("admin").runCommand( {configureFailPoint: "maxTimeAlwaysTimeOut", mode: "off"})); })(); // Test that setting a read preference on the $out also applies to the listIndexes // command. (function testReadPreference() { const secondaryDB = st.rs0.getSecondary().getDB("test"); assert.commandWorked(secondaryDB.setProfilingLevel(2)); // TODO SERVER-37250: Ban $readPreference secondary with new $out modes. assert.commandWorked(source.runCommand("aggregate", { pipeline: [{$out: {to: target.getName(), mode: "replaceDocuments", uniqueKey: shardKey}}], cursor: {}, $readPreference: {mode: "secondary"} })); // Verify that the profiler on the secondary includes an entry for the listIndexes. profilerHasNumMatchingEntriesOrThrow({ profileDB: secondaryDB, filter: {ns: target.getFullName(), "command.listIndexes": target.getName()}, numExpectedMatches: 1 }); // Verify that the primary shard does *not* have an additional listIndexes profiler entry. profilerHasNumMatchingEntriesOrThrow({ profileDB: primaryDB, filter: {ns: target.getFullName(), "command.listIndexes": target.getName()}, numExpectedMatches: 3 }); profilerHasNumMatchingEntriesOrThrow({ profileDB: secondaryDB, filter: { ns: source.getFullName(), "command.aggregate": source.getName(), "command.$readPreference": {mode: "secondary"}, }, numExpectedMatches: 1 }); // $out with mode "replaceCollection" cannot be run against a secondary since it writes // directly to a local temp collection. assert.commandFailedWithCode(source.runCommand("aggregate", { pipeline: [{$out: {to: "non_existent", mode: "replaceCollection"}}], cursor: {}, $readPreference: {mode: "secondary"} }), 16994, "Expected $out to fail to create the temp collection."); })(); st.stop(); })();