diff options
-rw-r--r-- | buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml | 5 | ||||
-rw-r--r-- | jstests/aggregation/sharded_agg_cleanup_on_error.js | 9 | ||||
-rw-r--r-- | jstests/auth/lib/commands_lib.js | 20 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/agg_merge_when_matched_replace_with_new.js (renamed from jstests/concurrency/fsm_workloads/agg_out_mode_replace_documents.js) | 18 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/agg_merge_when_not_matched_insert.js (renamed from jstests/concurrency/fsm_workloads/agg_out_mode_insert_documents.js) | 15 | ||||
-rw-r--r-- | jstests/core/txns/prepare_conflict_aggregation_behavior.js | 14 | ||||
-rw-r--r-- | jstests/core/views/views_aggregation.js | 34 | ||||
-rw-r--r-- | jstests/noPassthrough/merge_max_time_ms.js | 264 | ||||
-rw-r--r-- | jstests/noPassthrough/out_max_time_ms.js | 135 | ||||
-rw-r--r-- | jstests/noPassthrough/out_merge_majority_read.js (renamed from jstests/noPassthrough/out_majority_read.js) | 74 | ||||
-rw-r--r-- | jstests/replsets/linearizable_read_concern.js | 14 |
11 files changed, 410 insertions, 192 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml index b827b8d9fac..505cd412179 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml @@ -184,8 +184,9 @@ selector: # Uses non-retryable commands in the same state function as a command not supported in a # transaction. - - jstests/concurrency/fsm_workloads/agg_out_mode_insert_documents.js - - jstests/concurrency/fsm_workloads/agg_out_mode_replace_documents.js + - jstests/concurrency/fsm_workloads/agg_merge_when_not_matched_insert.js + - jstests/concurrency/fsm_workloads/agg_merge_when_matched_replace_with_new.js + # TODO SERVER-40713 moveChunk is not considered retryable by the network retry override. - jstests/concurrency/fsm_workloads/agg_with_chunk_migrations.js diff --git a/jstests/aggregation/sharded_agg_cleanup_on_error.js b/jstests/aggregation/sharded_agg_cleanup_on_error.js index c8122290afe..d84c37b3827 100644 --- a/jstests/aggregation/sharded_agg_cleanup_on_error.js +++ b/jstests/aggregation/sharded_agg_cleanup_on_error.js @@ -114,12 +114,15 @@ // Run an aggregation which is eligible for $exchange. This should assert because of // the failpoint. Add a $group stage to force an exchange-eligible split of the pipeline - // before the $out. Without the $group we won't use the exchange optimization and instead - // will send the $out to each shard. + // before the $merge. Without the $group we won't use the exchange optimization and instead + // will send the $merge to each shard. st.shardColl(mongosDB.target, {_id: 1}, {_id: 0}, {_id: 1}, kDBName, false); assertErrorCode( coll, - [{$group: {_id: "$fakeShardKey"}}, {$out: {to: "target", mode: "replaceDocuments"}}], + [ + {$group: {_id: "$fakeShardKey"}}, + {$merge: {into: "target", whenMatched: "replaceWithNew", whenNotMatched: "insert"}} + ], ErrorCodes.FailPointEnabled); // Neither mongos or the shards should leave cursors open. diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js index 2580ebe6635..86b560718c7 100644 --- a/jstests/auth/lib/commands_lib.js +++ b/jstests/auth/lib/commands_lib.js @@ -1113,11 +1113,17 @@ var authCommandsLib = { ] }, { - testname: "aggregate_out_insert_documents", + testname: "aggregate_merge_insert_documents", command: function(state, args) { return { aggregate: "foo", - pipeline: [{$out: {db: args.targetDB, to: "foo_out", mode: "insertDocuments"}}], + pipeline: [{ + $merge: { + into: {db: args.targetDB, coll: "foo_out"}, + whenMatched: "fail", + whenNotMatched: "insert" + } + }], cursor: {}, bypassDocumentValidation: args.bypassDocumentValidation, }; @@ -1181,11 +1187,17 @@ var authCommandsLib = { ] }, { - testname: "aggregate_out_replace_documents", + testname: "aggregate_merge_replace_documents", command: function(state, args) { return { aggregate: "foo", - pipeline: [{$out: {db: args.targetDB, to: "foo_out", mode: "replaceDocuments"}}], + pipeline: [{ + $merge: { + into: {db: args.targetDB, coll: "foo_out"}, + whenMatched: "replaceWithNew", + whenNotMatched: "insert" + } + }], cursor: {}, bypassDocumentValidation: args.bypassDocumentValidation, }; diff --git a/jstests/concurrency/fsm_workloads/agg_out_mode_replace_documents.js b/jstests/concurrency/fsm_workloads/agg_merge_when_matched_replace_with_new.js index 09676948073..0739be35561 100644 --- a/jstests/concurrency/fsm_workloads/agg_out_mode_replace_documents.js +++ b/jstests/concurrency/fsm_workloads/agg_merge_when_matched_replace_with_new.js @@ -1,10 +1,10 @@ 'use strict'; /** - * agg_out_mode_replace_documents.js + * agg_merge_when_matched_replace_with_new.js * - * Tests $out with mode "replaceDocuments" concurrently with moveChunk operations on the output - * collection. + * Tests $merge with whenMatched set to "replaceWithNew" concurrently with moveChunk operations on + * the output collection. * * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off, * requires_non_retryable_writes] @@ -14,7 +14,7 @@ load('jstests/concurrency/fsm_workloads/agg_with_chunk_migrations.js'); // for var $config = extendWorkload($config, function($config, $super) { // Set the collection to run concurrent moveChunk operations as the output collection. - $config.data.collWithMigrations = "out_mode_replace_documents"; + $config.data.collWithMigrations = "agg_merge_when_matched_replace_with_new"; $config.data.threadRunCount = 0; $config.states.aggregate = function aggregate(db, collName, connCache) { @@ -22,10 +22,16 @@ var $config = extendWorkload($config, function($config, $super) { // subsequent runs. const res = db[collName].aggregate([ {$addFields: {_id: this.tid, count: this.threadRunCount}}, - {$out: {to: this.collWithMigrations, mode: "replaceDocuments"}}, + { + $merge: { + into: this.collWithMigrations, + whenMatched: "replaceWithNew", + whenNotMatched: "insert" + } + }, ]); - // $out should always return 0 documents. + // $merge should always return 0 documents. assert.eq(0, res.itcount()); // If running with causal consistency, the writes may not have propagated to the secondaries // yet. diff --git a/jstests/concurrency/fsm_workloads/agg_out_mode_insert_documents.js b/jstests/concurrency/fsm_workloads/agg_merge_when_not_matched_insert.js index ededefa5ba0..ef8f5ab6c04 100644 --- a/jstests/concurrency/fsm_workloads/agg_out_mode_insert_documents.js +++ b/jstests/concurrency/fsm_workloads/agg_merge_when_not_matched_insert.js @@ -1,10 +1,10 @@ 'use strict'; /** - * agg_out_mode_insert_documents.js + * agg_merge_when_not_matched_insert.js * - * Tests $out with mode "insertDocuments" concurrently with moveChunk operations on the output - * collection. + * Tests $merge with "whenNotMatched" set to "insert" concurrently with moveChunk operations on the + * output collection. * * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off, * requires_non_retryable_writes]] @@ -14,7 +14,7 @@ load('jstests/concurrency/fsm_workloads/agg_with_chunk_migrations.js'); // for var $config = extendWorkload($config, function($config, $super) { // Set the collection to run concurrent moveChunk operations as the output collection. - $config.data.collWithMigrations = "out_mode_insert_documents"; + $config.data.collWithMigrations = "agg_merge_when_not_matched_insert"; $config.data.threadRunCount = 0; $config.states.aggregate = function aggregate(db, collName, connCache) { @@ -26,10 +26,13 @@ var $config = extendWorkload($config, function($config, $super) { "_id.doc": "$_id" } }, - {$out: {to: this.collWithMigrations, mode: "insertDocuments"}}, + { + $merge: + {into: this.collWithMigrations, whenMatched: "fail", whenNotMatched: "insert"} + }, ]); - // $out should always return 0 documents. + // $merge should always return 0 documents. assert.eq(0, res.itcount()); // If running with causal consistency, the writes may not have propagated to the secondaries // yet. diff --git a/jstests/core/txns/prepare_conflict_aggregation_behavior.js b/jstests/core/txns/prepare_conflict_aggregation_behavior.js index 0df15e63065..4a4fd38d20b 100644 --- a/jstests/core/txns/prepare_conflict_aggregation_behavior.js +++ b/jstests/core/txns/prepare_conflict_aggregation_behavior.js @@ -35,9 +35,12 @@ assert.commandWorked(sessionOutColl.update({_id: 0}, {a: 1})); let prepareTimestamp = PrepareHelpers.prepareTransaction(session); - jsTestLog("Test that reads from an aggregation pipeline with $out don't block on prepare" + + jsTestLog("Test that reads from an aggregation pipeline with $merge don't block on prepare" + " conflicts"); - testColl.aggregate([{$addFields: {b: 1}}, {$out: {to: outCollName, mode: "insertDocuments"}}]); + testColl.aggregate([ + {$addFields: {b: 1}}, + {$merge: {into: outCollName, whenMatched: "fail", whenNotMatched: "insert"}} + ]); // Make sure that we can see the inserts from the aggregation but not the updates from the // prepared transaction. @@ -49,7 +52,10 @@ prepareTimestamp = PrepareHelpers.prepareTransaction(session); jsTestLog("Test that writes from an aggregation pipeline block on prepare conflicts"); - let pipeline = [{$addFields: {c: 1}}, {$out: {to: outCollName, mode: "replaceDocuments"}}]; + let pipeline = [ + {$addFields: {c: 1}}, + {$merge: {into: outCollName, whenMatched: "replaceWithNew", whenNotMatched: "insert"}} + ]; assert.commandFailedWithCode(testDB.runCommand({ aggregate: collName, pipeline: pipeline, @@ -63,7 +69,7 @@ assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp)); - // Make sure that the $out pipeline works once the transaction is committed. + // Make sure that the $merge pipeline works once the transaction is committed. testColl.aggregate(pipeline); assert.eq([{_id: 0}, {_id: 1, c: 1}], outColl.find().toArray()); diff --git a/jstests/core/views/views_aggregation.js b/jstests/core/views/views_aggregation.js index 112e8c963a9..f79af4e882e 100644 --- a/jstests/core/views/views_aggregation.js +++ b/jstests/core/views/views_aggregation.js @@ -78,22 +78,42 @@ assertErrorCode(coll, [{$out: {to: "emptyPipelineView", mode: "replaceCollection"}}], ErrorCodes.CommandNotSupportedOnView); + // Test that the $merge stage errors when writing to a view namespace. + assertErrorCode( + coll, + [{$merge: {into: "emptyPipelineView", whenMatched: "fail", whenNotMatched: "insert"}}], + ErrorCodes.CommandNotSupportedOnView); assertErrorCode(coll, - [{$out: {to: "emptyPipelineView", mode: "insertDocuments"}}], - ErrorCodes.CommandNotSupportedOnView); - assertErrorCode(coll, - [{$out: {to: "emptyPipelineView", mode: "replaceDocuments"}}], + [{ + $merge: { + into: "emptyPipelineView", + whenMatched: "replaceWithNew", + whenNotMatched: "insert" + } + }], ErrorCodes.CommandNotSupportedOnView); - // Test that the $out stage errors when writing to a view namespace in a foreign database. + // Test that the $merge stage errors when writing to a view namespace in a foreign database. let foreignDB = db.getSiblingDB("views_aggregation_foreign"); foreignDB.view.drop(); assert.commandWorked(foreignDB.createView("view", "coll", [])); assertErrorCode(coll, - [{$out: {db: foreignDB.getName(), to: "view", mode: "insertDocuments"}}], + [{ + $merge: { + into: {db: foreignDB.getName(), coll: "view"}, + whenMatched: "fail", + whenNotMatched: "insert" + } + }], ErrorCodes.CommandNotSupportedOnView); assertErrorCode(coll, - [{$out: {db: foreignDB.getName(), to: "view", mode: "replaceDocuments"}}], + [{ + $merge: { + into: {db: foreignDB.getName(), coll: "view"}, + whenMatched: "replaceWithNew", + whenNotMatched: "insert" + } + }], ErrorCodes.CommandNotSupportedOnView); // TODO (SERVER-36832): When $out to foreign database is allowed with "replaceCollection", this // should fail with ErrorCodes.CommandNotSupportedOnView. diff --git a/jstests/noPassthrough/merge_max_time_ms.js b/jstests/noPassthrough/merge_max_time_ms.js new file mode 100644 index 00000000000..6cab88f0270 --- /dev/null +++ b/jstests/noPassthrough/merge_max_time_ms.js @@ -0,0 +1,264 @@ +/** + * Test that an aggregation with a $merge stage obeys the maxTimeMS. + * @tags: [requires_sharding, requires_replication] + */ +(function() { + load("jstests/aggregation/extras/out_helpers.js"); // For withEachMergeMode(). + load("jstests/libs/fixture_helpers.js"); // For isMongos(). + load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow. + + const kDBName = "test"; + const kSourceCollName = "merge_max_time_ms_source"; + const kDestCollName = "merge_max_time_ms_dest"; + const nDocs = 10; + + /** + * Helper for populating the collection. + */ + function insertDocs(coll) { + for (let i = 0; i < nDocs; i++) { + assert.commandWorked(coll.insert({_id: i})); + } + } + + /** + * Wait until the server sets its CurOp "msg" to the failpoint name, indicating that it's + * hanging. + */ + function waitUntilServerHangsOnFailPoint(conn, fpName) { + // Be sure that the server is hanging on the failpoint. + assert.soon(function() { + const filter = {"msg": fpName}; + const ops = conn.getDB("admin") + .aggregate([{$currentOp: {allUsers: true}}, {$match: filter}]) + .toArray(); + return ops.length == 1; + }); + } + + /** + * Given a $merge parameters mongod connection, run a $out aggregation against 'conn' which + * hangs on the given failpoint and ensure that the $out maxTimeMS expires. + */ + function forceAggregationToHangAndCheckMaxTimeMsExpires( + whenMatched, whenNotMatched, conn, failPointName) { + // Use a short maxTimeMS so that the test completes in a reasonable amount of time. We will + // use the 'maxTimeNeverTimeOut' failpoint to ensure that the operation does not + // prematurely time out. + const maxTimeMS = 1000 * 2; + + // Enable a failPoint so that the write will hang. + let failpointCommand = { + configureFailPoint: failPointName, + mode: "alwaysOn", + data: {nss: kDBName + "." + kDestCollName} + }; + + assert.commandWorked(conn.getDB("admin").runCommand(failpointCommand)); + + // Make sure we don't run out of time before the failpoint is hit. + assert.commandWorked(conn.getDB("admin").runCommand( + {configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"})); + + // Build the parallel shell function. + let shellStr = `const sourceColl = db['${kSourceCollName}'];`; + shellStr += `const destColl = db['${kDestCollName}'];`; + shellStr += `const maxTimeMS = ${maxTimeMS};`; + shellStr += `const whenMatched = ${tojson(whenMatched)};`; + shellStr += `const whenNotMatched = '${whenNotMatched}';`; + const runAggregate = function() { + const pipeline = [{ + $merge: { + into: destColl.getName(), + whenMatched: whenMatched, + whenNotMatched: whenNotMatched + } + }]; + const err = assert.throws(() => sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS})); + assert.eq(err.code, ErrorCodes.MaxTimeMSExpired, "expected aggregation to fail"); + }; + shellStr += `(${runAggregate.toString()})();`; + const awaitShell = startParallelShell(shellStr, conn.port); + + waitUntilServerHangsOnFailPoint(conn, failPointName); + + assert.commandWorked(conn.getDB("admin").runCommand( + {configureFailPoint: "maxTimeNeverTimeOut", mode: "off"})); + + // The aggregation running in the parallel shell will hang on the failpoint, burning + // its time. Wait until the maxTimeMS has definitely expired. + sleep(maxTimeMS + 2000); + + // Now drop the failpoint, allowing the aggregation to proceed. It should hit an + // interrupt check and terminate immediately. + assert.commandWorked( + conn.getDB("admin").runCommand({configureFailPoint: failPointName, mode: "off"})); + + // Wait for the parallel shell to finish. + assert.eq(awaitShell(), 0); + } + + function runUnshardedTest(whenMatched, whenNotMatched, conn) { + jsTestLog("Running unsharded test in whenMatched: " + whenMatched + " whenNotMatched: " + + whenNotMatched); + // The target collection will always be empty so we do not test the setting that will cause + // only failure. + if (whenNotMatched == "fail") { + return; + } + + const sourceColl = conn.getDB(kDBName)[kSourceCollName]; + const destColl = conn.getDB(kDBName)[kDestCollName]; + assert.commandWorked(destColl.remove({})); + + // Be sure we're able to read from a cursor with a maxTimeMS set on it. + (function() { + // Use a long maxTimeMS, since we expect the operation to finish. + const maxTimeMS = 1000 * 600; + const pipeline = [{ + $merge: { + into: destColl.getName(), + whenMatched: whenMatched, + whenNotMatched: whenNotMatched + } + }]; + assert.doesNotThrow(() => sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS})); + })(); + + assert.commandWorked(destColl.remove({})); + + // Force the aggregation to hang while the batch is being written. The failpoint changes + // depending on the mode. If 'whenMatched' is set to "fail" then the implementation will end + // up issuing insert commands instead of updates. + const kFailPointName = + whenMatched == "fail" ? "hangDuringBatchInsert" : "hangDuringBatchUpdate"; + forceAggregationToHangAndCheckMaxTimeMsExpires( + whenMatched, whenNotMatched, conn, kFailPointName); + + assert.commandWorked(destColl.remove({})); + + // Force the aggregation to hang while the batch is being built. + forceAggregationToHangAndCheckMaxTimeMsExpires( + whenMatched, whenNotMatched, conn, "hangWhileBuildingDocumentSourceMergeBatch"); + } + + // Run on a standalone. + (function() { + const conn = MongoRunner.runMongod({}); + assert.neq(null, conn, 'mongod was unable to start up'); + insertDocs(conn.getDB(kDBName)[kSourceCollName]); + withEachMergeMode( + (mode) => runUnshardedTest(mode.whenMatchedMode, mode.whenNotMatchedMode, conn)); + MongoRunner.stopMongod(conn); + })(); + + // Runs a $merge against 'mongosConn' and verifies that the maxTimeMS value is included in the + // command sent to mongod. Since the actual timeout can unreliably happen in mongos before even + // reaching the shard, we instead set a very large timeout and verify that the command sent to + // mongod includes the maxTimeMS. + function runShardedTest(whenMatched, whenNotMatched, mongosConn, mongodConn, comment) { + jsTestLog("Running sharded test in whenMatched: " + whenMatched + " whenNotMatched: " + + whenNotMatched); + // The target collection will always be empty so we do not test the setting that will cause + // only failure. + if (whenNotMatched == "fail") { + return; + } + + // Set a large timeout since we expect the command to finish. + const maxTimeMS = 1000 * 20; + + const sourceColl = mongosConn.getDB(kDBName)[kSourceCollName]; + const destColl = mongosConn.getDB(kDBName)[kDestCollName]; + assert.commandWorked(destColl.remove({})); + + // Make sure we don't timeout in mongos before even reaching the shards. + assert.commandWorked(mongosConn.getDB("admin").runCommand( + {configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"})); + + const cursor = sourceColl.aggregate([{ + $merge: { + into: destColl.getName(), + whenMatched: whenMatched, + whenNotMatched: whenNotMatched + } + }], + {maxTimeMS: maxTimeMS, comment: comment}); + assert(!cursor.hasNext()); + + // Filter the profiler entries on the existence of $merge, since aggregations through mongos + // will include an extra aggregation with an empty pipeline to establish cursors on the + // shards. + assert.soon(function() { + return mongodConn.getDB(kDBName) + .system.profile + .find({ + "command.aggregate": kSourceCollName, + "command.pipeline.$merge": {"$exists": true}, + "command.comment": comment, + "command.maxTimeMS": maxTimeMS, + }) + .itcount() == 1; + }); + + assert.commandWorked(mongosConn.getDB("admin").runCommand( + {configureFailPoint: "maxTimeNeverTimeOut", mode: "off"})); + } + + // Run on a sharded cluster. + (function() { + const st = new ShardingTest({shards: 2}); + + // Ensure shard 0 is the primary shard. This is so that the $merge stage is guaranteed to + // run on it. + assert.commandWorked(st.s.getDB("admin").runCommand({enableSharding: kDBName})); + st.ensurePrimaryShard(kDBName, st.shard0.name); + + // Set up the source collection to be sharded in a way such that each node will have some + // documents for the remainder of the test. + // shard 0: [MinKey, 5] + // shard 1: [5, MaxKey] + st.shardColl(kSourceCollName, + {_id: 1}, // key + {_id: 5}, // split + {_id: 6}, // move + kDBName); + insertDocs(st.s.getDB(kDBName)[kSourceCollName]); + + // Start the profiler on each shard so that we can examine the $out's maxTimeMS. + assert.commandWorked(st.shard0.getDB(kDBName).setProfilingLevel(2)); + assert.commandWorked(st.shard1.getDB(kDBName).setProfilingLevel(2)); + + // // Run the test with 'destColl' unsharded. + withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode, + mode.whenNotMatchedMode, + st.s, + st.shard0, + mode + "_unshardedDest")); + + // Run the test with 'destColl' sharded. This means that writes will be sent to both + // shards, and if either one hangs, the MaxTimeMS will expire. + // Shard the destination collection. + st.shardColl(kDestCollName, + {_id: 1}, // key + {_id: 5}, // split + {_id: 6}, // move + kDBName); + + jsTestLog("Running test forcing shard " + st.shard0.name + " to hang"); + withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode, + mode.whenNotMatchedMode, + st.s, + st.shard0, + mode + "_shardedDest_" + st.shard0.name)); + + jsTestLog("Running test forcing shard " + st.shard1.name + " to hang"); + withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode, + mode.whenNotMatchedMode, + st.s, + st.shard1, + mode + "_shardedDest_" + st.shard1.name)); + + st.stop(); + })(); +})(); diff --git a/jstests/noPassthrough/out_max_time_ms.js b/jstests/noPassthrough/out_max_time_ms.js index 16c5b129261..578ab60a6e2 100644 --- a/jstests/noPassthrough/out_max_time_ms.js +++ b/jstests/noPassthrough/out_max_time_ms.js @@ -3,9 +3,8 @@ * @tags: [requires_sharding, requires_replication] */ (function() { - load("jstests/aggregation/extras/out_helpers.js"); // For withEachOutMode(). - load("jstests/libs/fixture_helpers.js"); // For isMongos(). - load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow. + load("jstests/libs/fixture_helpers.js"); // For isMongos(). + load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow. const kDBName = "test"; const kSourceCollName = "out_max_time_ms_source"; @@ -37,27 +36,21 @@ } /** - * Given a $out mode and a mongod connection, run a $out aggregation against 'conn' which hangs - * on the given failpoint and ensure that the $out maxTimeMS expires. + * Given a mongod connection, run a $out aggregation against 'conn' which hangs on the given + * failpoint and ensure that the $out maxTimeMS expires. */ - function forceAggregationToHangAndCheckMaxTimeMsExpires(mode, conn, failPointName) { + function forceAggregationToHangAndCheckMaxTimeMsExpires(conn, failPointName) { // Use a short maxTimeMS so that the test completes in a reasonable amount of time. We will - // use the 'maxTimeNeverTimeOut' failpoint to ensure that the operation does not - // prematurely time out. + // use the 'maxTimeNeverTimeOut' failpoint to ensure that the operation does not prematurely + // time out. const maxTimeMS = 1000 * 2; // Enable a failPoint so that the write will hang. let failpointCommand = { configureFailPoint: failPointName, mode: "alwaysOn", - data: {nss: kDBName + "." + kDestCollName} }; - // For mode "replaceCollection", the namespace of the writes will be to a temp namespace so - // remove the restriction on nss. - if (mode == "replaceCollection") - delete failpointCommand.data; - assert.commandWorked(conn.getDB("admin").runCommand(failpointCommand)); // Make sure we don't run out of time before the failpoint is hit. @@ -68,9 +61,8 @@ let shellStr = `const sourceColl = db['${kSourceCollName}'];`; shellStr += `const destColl = db['${kDestCollName}'];`; shellStr += `const maxTimeMS = ${maxTimeMS};`; - shellStr += `const mode = '${mode}';`; const runAggregate = function() { - const pipeline = [{$out: {to: destColl.getName(), mode: mode}}]; + const pipeline = [{$out: destColl.getName()}]; const err = assert.throws(() => sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS})); assert.eq(err.code, ErrorCodes.MaxTimeMSExpired, "expected aggregation to fail"); }; @@ -95,8 +87,8 @@ assert.eq(awaitShell(), 0); } - function runUnshardedTest(mode, conn) { - jsTestLog("Running test in mode " + mode); + function runUnshardedTest(conn) { + jsTestLog("Running unsharded test"); const sourceColl = conn.getDB(kDBName)[kSourceCollName]; const destColl = conn.getDB(kDBName)[kDestCollName]; @@ -106,7 +98,7 @@ (function() { // Use a long maxTimeMS, since we expect the operation to finish. const maxTimeMS = 1000 * 600; - const pipeline = [{$out: {to: destColl.getName(), mode: mode}}]; + const pipeline = [{$out: destColl.getName()}]; const cursor = sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS}); assert(!cursor.hasNext()); assert.eq(destColl.countDocuments({_id: {$exists: true}}), nDocs); @@ -115,18 +107,14 @@ assert.commandWorked(destColl.remove({})); // Force the aggregation to hang while the batch is being written. - const kFailPointName = - mode == "replaceDocuments" ? "hangDuringBatchUpdate" : "hangDuringBatchInsert"; - forceAggregationToHangAndCheckMaxTimeMsExpires(mode, conn, kFailPointName); + const kFailPointName = "hangDuringBatchInsert"; + forceAggregationToHangAndCheckMaxTimeMsExpires(conn, kFailPointName); assert.commandWorked(destColl.remove({})); // Force the aggregation to hang while the batch is being built. - forceAggregationToHangAndCheckMaxTimeMsExpires( - mode, - conn, - mode == "replaceCollection" ? "hangWhileBuildingDocumentSourceOutBatch" - : "hangWhileBuildingDocumentSourceMergeBatch"); + forceAggregationToHangAndCheckMaxTimeMsExpires(conn, + "hangWhileBuildingDocumentSourceOutBatch"); } // Run on a standalone. @@ -134,98 +122,7 @@ const conn = MongoRunner.runMongod({}); assert.neq(null, conn, 'mongod was unable to start up'); insertDocs(conn.getDB(kDBName)[kSourceCollName]); - withEachOutMode((mode) => runUnshardedTest(mode, conn)); + runUnshardedTest(conn); MongoRunner.stopMongod(conn); })(); - - // Runs a $out against 'mongosConn' and verifies that the maxTimeMS value is included in the - // command sent to mongod. Since the actual timeout can unreliably happen in mongos before even - // reaching the shard, we instead set a very large timeout and verify that the command sent to - // mongod includes the maxTimeMS. - function runShardedTest(mode, mongosConn, mongodConn, comment) { - jsTestLog("Running sharded test in mode " + mode); - if (mode == "replaceCollection") { - return; - } - - // Set a large timeout since we expect the command to finish. - const maxTimeMS = 1000 * 20; - - const sourceColl = mongosConn.getDB(kDBName)[kSourceCollName]; - const destColl = mongosConn.getDB(kDBName)[kDestCollName]; - assert.commandWorked(destColl.remove({})); - - // Make sure we don't timeout in mongos before even reaching the shards. - assert.commandWorked(mongosConn.getDB("admin").runCommand( - {configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"})); - - const cursor = sourceColl.aggregate([{$out: {to: destColl.getName(), mode: mode}}], - {maxTimeMS: maxTimeMS, comment: comment}); - assert(!cursor.hasNext()); - - // Filter the profiler entries on the existence of $out, since aggregations through mongos - // will include an extra aggregation with an empty pipeline to establish cursors on the - // shards. - assert.soon(function() { - return mongodConn.getDB(kDBName) - .system.profile - .find({ - "command.aggregate": kSourceCollName, - "command.pipeline.$out": {"$exists": true}, - "command.comment": comment, - "command.maxTimeMS": maxTimeMS, - }) - .itcount() == 1; - }); - - assert.commandWorked(mongosConn.getDB("admin").runCommand( - {configureFailPoint: "maxTimeNeverTimeOut", mode: "off"})); - } - - // Run on a sharded cluster. - (function() { - const st = new ShardingTest({shards: 2}); - - // Ensure shard 0 is the primary shard. This is so that the $out stage is guaranteed to - // run on it. - assert.commandWorked(st.s.getDB("admin").runCommand({enableSharding: kDBName})); - st.ensurePrimaryShard(kDBName, st.shard0.name); - - // Set up the source collection to be sharded in a way such that each node will have some - // documents for the remainder of the test. - // shard 0: [MinKey, 5] - // shard 1: [5, MaxKey] - st.shardColl(kSourceCollName, - {_id: 1}, // key - {_id: 5}, // split - {_id: 6}, // move - kDBName); - insertDocs(st.s.getDB(kDBName)[kSourceCollName]); - - // Start the profiler on each shard so that we can examine the $out's maxTimeMS. - assert.commandWorked(st.shard0.getDB(kDBName).setProfilingLevel(2)); - assert.commandWorked(st.shard1.getDB(kDBName).setProfilingLevel(2)); - - // // Run the test with 'destColl' unsharded. - withEachOutMode((mode) => runShardedTest(mode, st.s, st.shard0, mode + "_unshardedDest")); - - // Run the test with 'destColl' sharded. This means that writes will be sent to both - // shards, and if either one hangs, the MaxTimeMS will expire. - // Shard the destination collection. - st.shardColl(kDestCollName, - {_id: 1}, // key - {_id: 5}, // split - {_id: 6}, // move - kDBName); - - jsTestLog("Running test forcing shard " + st.shard0.name + " to hang"); - withEachOutMode((mode) => runShardedTest( - mode, st.s, st.shard0, mode + "_shardedDest_" + st.shard0.name)); - - jsTestLog("Running test forcing shard " + st.shard1.name + " to hang"); - withEachOutMode((mode) => runShardedTest( - mode, st.s, st.shard1, mode + "_shardedDest_" + st.shard1.name)); - - st.stop(); - })(); })(); diff --git a/jstests/noPassthrough/out_majority_read.js b/jstests/noPassthrough/out_merge_majority_read.js index e27f98033f8..32237708dc0 100644 --- a/jstests/noPassthrough/out_majority_read.js +++ b/jstests/noPassthrough/out_merge_majority_read.js @@ -1,5 +1,5 @@ /** - * Tests that $out with readConcern majority will only see committed data. + * Tests that $out and $merge with readConcern majority will only see committed data. * * Each operation is tested on a single node, and (if supported) through mongos on both sharded and * unsharded collections. Mongos doesn't directly handle readConcern majority, but these tests @@ -48,7 +48,7 @@ assert.commandWorked(targetReplaceDocsColl.insert({_id: 1, state: 'before'})); setCommittedSnapshot(makeSnapshot()); - // This insert will not be visible to $out. + // This insert will not be visible to $merge. assert.commandWorked(sourceColl.insert({_id: 2, state: 'before'})); assert.commandWorked(targetReplaceDocsColl.insert({_id: 2, state: 'before'})); // Similarly this update will not be visible. @@ -59,17 +59,17 @@ let res = sourceColl.aggregate([], {readConcern: {level: 'majority'}}); assert.eq(res.itcount(), 1); - // Run $out with the insertDocuments mode. It will pick only the first document. Also it - // will not see the update ('after'). + // Run $merge with whenNotMatched set to "insert". It will pick only the first document. + // Also it will not see the update ('after'). res = sourceColl.aggregate( [ {$match: {state: 'before'}}, - {$project: {state: 'out'}}, + {$project: {state: 'merge'}}, { - $out: { - to: targetColl.getName(), - db: targetColl.getDB().getName(), - mode: "insertDocuments" + $merge: { + into: {db: targetColl.getDB().getName(), coll: targetColl.getName()}, + whenMatched: "fail", + whenNotMatched: "insert" } } ], @@ -77,22 +77,24 @@ assert.eq(res.itcount(), 0); - // Validate the insertDocuments results. res = targetColl.find().sort({_id: 1}); - // Only a single document is visible ($out did not see the second insert). - assert.docEq(res.next(), {_id: 1, state: 'out'}); + // Only a single document is visible ($merge did not see the second insert). + assert.docEq(res.next(), {_id: 1, state: 'merge'}); assert(res.isExhausted()); - // The same $out but in replaceDocuments. + // The same $merge but with whenMatched set to "replaceWithNew". res = sourceColl.aggregate( [ {$match: {state: 'before'}}, - {$project: {state: 'out'}}, + {$project: {state: 'merge'}}, { - $out: { - to: targetReplaceDocsColl.getName(), - db: targetReplaceDocsColl.getDB().getName(), - mode: "replaceDocuments" + $merge: { + into: { + db: targetReplaceDocsColl.getDB().getName(), + coll: targetReplaceDocsColl.getName() + }, + whenMatched: "replaceWithNew", + whenNotMatched: "insert" } } ], @@ -101,30 +103,29 @@ setCommittedSnapshot(makeSnapshot()); - // Validate the replaceDocuments results. res = targetReplaceDocsColl.find().sort({_id: 1}); - // The first document must overwrite the update that the read portion of $out did not see. - assert.docEq(res.next(), {_id: 1, state: 'out'}); - // The second document is the result of the independent insert that $out did not see. + // The first document must overwrite the update that the read portion of $merge did not see. + assert.docEq(res.next(), {_id: 1, state: 'merge'}); + // The second document is the result of the independent insert that $merge did not see. assert.docEq(res.next(), {_id: 2, state: 'before'}); assert(res.isExhausted()); assert.commandWorked(targetColl.remove({})); setCommittedSnapshot(makeSnapshot()); - // Insert a document that will collide with $out insert. The insert is not majority + // Insert a document that will collide with $merge insert. The insert is not majority // commited. assert.commandWorked(targetColl.insert({_id: 1, state: 'collision'})); res = db.runCommand({ aggregate: sourceColl.getName(), pipeline: [ - {$project: {state: 'out'}}, + {$project: {state: 'merge'}}, { - $out: { - to: targetColl.getName(), - db: targetColl.getDB().getName(), - mode: "insertDocuments" + $merge: { + into: {db: targetColl.getDB().getName(), coll: targetColl.getName()}, + whenMatched: "fail", + whenNotMatched: "insert" } } ], @@ -138,16 +139,16 @@ assert.commandWorked(targetColl.remove({_id: 1})); assert.commandWorked(targetColl.remove({_id: 2})); - // $out should successfuly 'overwrite' the collection as it is 'empty' (not majority). + // $merge should successfuly 'overwrite' the collection as it is 'empty' (not majority). res = targetReplaceDocsColl.aggregate( [ {$match: {state: 'before'}}, - {$project: {state: 'out'}}, + {$project: {state: 'merge'}}, { - $out: { - to: targetColl.getName(), - db: targetColl.getDB().getName(), - mode: "insertDocuments" + $merge: { + into: {db: targetColl.getDB().getName(), coll: targetColl.getName()}, + whenMatched: "fail", + whenNotMatched: "insert" } } ], @@ -157,10 +158,9 @@ setCommittedSnapshot(makeSnapshot()); - // Validate the insertDocuments results. res = targetColl.find().sort({_id: 1}); - // Only a single document is visible ($out did not see the second insert). - assert.docEq(res.next(), {_id: 2, state: 'out'}); + // Only a single document is visible ($merge did not see the second insert). + assert.docEq(res.next(), {_id: 2, state: 'merge'}); assert(res.isExhausted()); } diff --git a/jstests/replsets/linearizable_read_concern.js b/jstests/replsets/linearizable_read_concern.js index 78867709a8f..b211ebe5f3b 100644 --- a/jstests/replsets/linearizable_read_concern.js +++ b/jstests/replsets/linearizable_read_concern.js @@ -78,10 +78,16 @@ load('jstests/libs/write_concern_util.js'); assert.eq(opTimeCmd.code, ErrorCodes.FailedToParse); // A $out aggregation is not allowed with readConcern level "linearizable". - let result = assert.throws( - () => primary.getDB("test").foo.aggregate([{$out: {to: "out", mode: "replaceDocuments"}}], - {readConcern: {level: "linearizable"}})); - assert.eq(result.code, ErrorCodes.InvalidOptions); + let outResult = assert.throws(() => primary.getDB("test").foo.aggregate( + [{$out: "out"}], {readConcern: {level: "linearizable"}})); + assert.eq(outResult.code, ErrorCodes.InvalidOptions); + + // A $merge aggregation is not allowed with readConcern level "linearizable". + let mergeResult = assert.throws( + () => primary.getDB("test").foo.aggregate( + [{$merge: {into: "out", whenMatched: "replaceWithNew", whenNotMatched: "insert"}}], + {readConcern: {level: "linearizable"}})); + assert.eq(mergeResult.code, ErrorCodes.InvalidOptions); primary = replTest.getPrimary(); |