summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml5
-rw-r--r--jstests/aggregation/sharded_agg_cleanup_on_error.js9
-rw-r--r--jstests/auth/lib/commands_lib.js20
-rw-r--r--jstests/concurrency/fsm_workloads/agg_merge_when_matched_replace_with_new.js (renamed from jstests/concurrency/fsm_workloads/agg_out_mode_replace_documents.js)18
-rw-r--r--jstests/concurrency/fsm_workloads/agg_merge_when_not_matched_insert.js (renamed from jstests/concurrency/fsm_workloads/agg_out_mode_insert_documents.js)15
-rw-r--r--jstests/core/txns/prepare_conflict_aggregation_behavior.js14
-rw-r--r--jstests/core/views/views_aggregation.js34
-rw-r--r--jstests/noPassthrough/merge_max_time_ms.js264
-rw-r--r--jstests/noPassthrough/out_max_time_ms.js135
-rw-r--r--jstests/noPassthrough/out_merge_majority_read.js (renamed from jstests/noPassthrough/out_majority_read.js)74
-rw-r--r--jstests/replsets/linearizable_read_concern.js14
11 files changed, 410 insertions, 192 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml
index b827b8d9fac..505cd412179 100644
--- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml
+++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_stepdowns.yml
@@ -184,8 +184,9 @@ selector:
# Uses non-retryable commands in the same state function as a command not supported in a
# transaction.
- - jstests/concurrency/fsm_workloads/agg_out_mode_insert_documents.js
- - jstests/concurrency/fsm_workloads/agg_out_mode_replace_documents.js
+ - jstests/concurrency/fsm_workloads/agg_merge_when_not_matched_insert.js
+ - jstests/concurrency/fsm_workloads/agg_merge_when_matched_replace_with_new.js
+
# TODO SERVER-40713 moveChunk is not considered retryable by the network retry override.
- jstests/concurrency/fsm_workloads/agg_with_chunk_migrations.js
diff --git a/jstests/aggregation/sharded_agg_cleanup_on_error.js b/jstests/aggregation/sharded_agg_cleanup_on_error.js
index c8122290afe..d84c37b3827 100644
--- a/jstests/aggregation/sharded_agg_cleanup_on_error.js
+++ b/jstests/aggregation/sharded_agg_cleanup_on_error.js
@@ -114,12 +114,15 @@
// Run an aggregation which is eligible for $exchange. This should assert because of
// the failpoint. Add a $group stage to force an exchange-eligible split of the pipeline
- // before the $out. Without the $group we won't use the exchange optimization and instead
- // will send the $out to each shard.
+ // before the $merge. Without the $group we won't use the exchange optimization and instead
+ // will send the $merge to each shard.
st.shardColl(mongosDB.target, {_id: 1}, {_id: 0}, {_id: 1}, kDBName, false);
assertErrorCode(
coll,
- [{$group: {_id: "$fakeShardKey"}}, {$out: {to: "target", mode: "replaceDocuments"}}],
+ [
+ {$group: {_id: "$fakeShardKey"}},
+ {$merge: {into: "target", whenMatched: "replaceWithNew", whenNotMatched: "insert"}}
+ ],
ErrorCodes.FailPointEnabled);
// Neither mongos or the shards should leave cursors open.
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index 2580ebe6635..86b560718c7 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -1113,11 +1113,17 @@ var authCommandsLib = {
]
},
{
- testname: "aggregate_out_insert_documents",
+ testname: "aggregate_merge_insert_documents",
command: function(state, args) {
return {
aggregate: "foo",
- pipeline: [{$out: {db: args.targetDB, to: "foo_out", mode: "insertDocuments"}}],
+ pipeline: [{
+ $merge: {
+ into: {db: args.targetDB, coll: "foo_out"},
+ whenMatched: "fail",
+ whenNotMatched: "insert"
+ }
+ }],
cursor: {},
bypassDocumentValidation: args.bypassDocumentValidation,
};
@@ -1181,11 +1187,17 @@ var authCommandsLib = {
]
},
{
- testname: "aggregate_out_replace_documents",
+ testname: "aggregate_merge_replace_documents",
command: function(state, args) {
return {
aggregate: "foo",
- pipeline: [{$out: {db: args.targetDB, to: "foo_out", mode: "replaceDocuments"}}],
+ pipeline: [{
+ $merge: {
+ into: {db: args.targetDB, coll: "foo_out"},
+ whenMatched: "replaceWithNew",
+ whenNotMatched: "insert"
+ }
+ }],
cursor: {},
bypassDocumentValidation: args.bypassDocumentValidation,
};
diff --git a/jstests/concurrency/fsm_workloads/agg_out_mode_replace_documents.js b/jstests/concurrency/fsm_workloads/agg_merge_when_matched_replace_with_new.js
index 09676948073..0739be35561 100644
--- a/jstests/concurrency/fsm_workloads/agg_out_mode_replace_documents.js
+++ b/jstests/concurrency/fsm_workloads/agg_merge_when_matched_replace_with_new.js
@@ -1,10 +1,10 @@
'use strict';
/**
- * agg_out_mode_replace_documents.js
+ * agg_merge_when_matched_replace_with_new.js
*
- * Tests $out with mode "replaceDocuments" concurrently with moveChunk operations on the output
- * collection.
+ * Tests $merge with whenMatched set to "replaceWithNew" concurrently with moveChunk operations on
+ * the output collection.
*
* @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off,
* requires_non_retryable_writes]
@@ -14,7 +14,7 @@ load('jstests/concurrency/fsm_workloads/agg_with_chunk_migrations.js'); // for
var $config = extendWorkload($config, function($config, $super) {
// Set the collection to run concurrent moveChunk operations as the output collection.
- $config.data.collWithMigrations = "out_mode_replace_documents";
+ $config.data.collWithMigrations = "agg_merge_when_matched_replace_with_new";
$config.data.threadRunCount = 0;
$config.states.aggregate = function aggregate(db, collName, connCache) {
@@ -22,10 +22,16 @@ var $config = extendWorkload($config, function($config, $super) {
// subsequent runs.
const res = db[collName].aggregate([
{$addFields: {_id: this.tid, count: this.threadRunCount}},
- {$out: {to: this.collWithMigrations, mode: "replaceDocuments"}},
+ {
+ $merge: {
+ into: this.collWithMigrations,
+ whenMatched: "replaceWithNew",
+ whenNotMatched: "insert"
+ }
+ },
]);
- // $out should always return 0 documents.
+ // $merge should always return 0 documents.
assert.eq(0, res.itcount());
// If running with causal consistency, the writes may not have propagated to the secondaries
// yet.
diff --git a/jstests/concurrency/fsm_workloads/agg_out_mode_insert_documents.js b/jstests/concurrency/fsm_workloads/agg_merge_when_not_matched_insert.js
index ededefa5ba0..ef8f5ab6c04 100644
--- a/jstests/concurrency/fsm_workloads/agg_out_mode_insert_documents.js
+++ b/jstests/concurrency/fsm_workloads/agg_merge_when_not_matched_insert.js
@@ -1,10 +1,10 @@
'use strict';
/**
- * agg_out_mode_insert_documents.js
+ * agg_merge_when_not_matched_insert.js
*
- * Tests $out with mode "insertDocuments" concurrently with moveChunk operations on the output
- * collection.
+ * Tests $merge with "whenNotMatched" set to "insert" concurrently with moveChunk operations on the
+ * output collection.
*
* @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off,
* requires_non_retryable_writes]]
@@ -14,7 +14,7 @@ load('jstests/concurrency/fsm_workloads/agg_with_chunk_migrations.js'); // for
var $config = extendWorkload($config, function($config, $super) {
// Set the collection to run concurrent moveChunk operations as the output collection.
- $config.data.collWithMigrations = "out_mode_insert_documents";
+ $config.data.collWithMigrations = "agg_merge_when_not_matched_insert";
$config.data.threadRunCount = 0;
$config.states.aggregate = function aggregate(db, collName, connCache) {
@@ -26,10 +26,13 @@ var $config = extendWorkload($config, function($config, $super) {
"_id.doc": "$_id"
}
},
- {$out: {to: this.collWithMigrations, mode: "insertDocuments"}},
+ {
+ $merge:
+ {into: this.collWithMigrations, whenMatched: "fail", whenNotMatched: "insert"}
+ },
]);
- // $out should always return 0 documents.
+ // $merge should always return 0 documents.
assert.eq(0, res.itcount());
// If running with causal consistency, the writes may not have propagated to the secondaries
// yet.
diff --git a/jstests/core/txns/prepare_conflict_aggregation_behavior.js b/jstests/core/txns/prepare_conflict_aggregation_behavior.js
index 0df15e63065..4a4fd38d20b 100644
--- a/jstests/core/txns/prepare_conflict_aggregation_behavior.js
+++ b/jstests/core/txns/prepare_conflict_aggregation_behavior.js
@@ -35,9 +35,12 @@
assert.commandWorked(sessionOutColl.update({_id: 0}, {a: 1}));
let prepareTimestamp = PrepareHelpers.prepareTransaction(session);
- jsTestLog("Test that reads from an aggregation pipeline with $out don't block on prepare" +
+ jsTestLog("Test that reads from an aggregation pipeline with $merge don't block on prepare" +
" conflicts");
- testColl.aggregate([{$addFields: {b: 1}}, {$out: {to: outCollName, mode: "insertDocuments"}}]);
+ testColl.aggregate([
+ {$addFields: {b: 1}},
+ {$merge: {into: outCollName, whenMatched: "fail", whenNotMatched: "insert"}}
+ ]);
// Make sure that we can see the inserts from the aggregation but not the updates from the
// prepared transaction.
@@ -49,7 +52,10 @@
prepareTimestamp = PrepareHelpers.prepareTransaction(session);
jsTestLog("Test that writes from an aggregation pipeline block on prepare conflicts");
- let pipeline = [{$addFields: {c: 1}}, {$out: {to: outCollName, mode: "replaceDocuments"}}];
+ let pipeline = [
+ {$addFields: {c: 1}},
+ {$merge: {into: outCollName, whenMatched: "replaceWithNew", whenNotMatched: "insert"}}
+ ];
assert.commandFailedWithCode(testDB.runCommand({
aggregate: collName,
pipeline: pipeline,
@@ -63,7 +69,7 @@
assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp));
- // Make sure that the $out pipeline works once the transaction is committed.
+ // Make sure that the $merge pipeline works once the transaction is committed.
testColl.aggregate(pipeline);
assert.eq([{_id: 0}, {_id: 1, c: 1}], outColl.find().toArray());
diff --git a/jstests/core/views/views_aggregation.js b/jstests/core/views/views_aggregation.js
index 112e8c963a9..f79af4e882e 100644
--- a/jstests/core/views/views_aggregation.js
+++ b/jstests/core/views/views_aggregation.js
@@ -78,22 +78,42 @@
assertErrorCode(coll,
[{$out: {to: "emptyPipelineView", mode: "replaceCollection"}}],
ErrorCodes.CommandNotSupportedOnView);
+ // Test that the $merge stage errors when writing to a view namespace.
+ assertErrorCode(
+ coll,
+ [{$merge: {into: "emptyPipelineView", whenMatched: "fail", whenNotMatched: "insert"}}],
+ ErrorCodes.CommandNotSupportedOnView);
assertErrorCode(coll,
- [{$out: {to: "emptyPipelineView", mode: "insertDocuments"}}],
- ErrorCodes.CommandNotSupportedOnView);
- assertErrorCode(coll,
- [{$out: {to: "emptyPipelineView", mode: "replaceDocuments"}}],
+ [{
+ $merge: {
+ into: "emptyPipelineView",
+ whenMatched: "replaceWithNew",
+ whenNotMatched: "insert"
+ }
+ }],
ErrorCodes.CommandNotSupportedOnView);
- // Test that the $out stage errors when writing to a view namespace in a foreign database.
+ // Test that the $merge stage errors when writing to a view namespace in a foreign database.
let foreignDB = db.getSiblingDB("views_aggregation_foreign");
foreignDB.view.drop();
assert.commandWorked(foreignDB.createView("view", "coll", []));
assertErrorCode(coll,
- [{$out: {db: foreignDB.getName(), to: "view", mode: "insertDocuments"}}],
+ [{
+ $merge: {
+ into: {db: foreignDB.getName(), coll: "view"},
+ whenMatched: "fail",
+ whenNotMatched: "insert"
+ }
+ }],
ErrorCodes.CommandNotSupportedOnView);
assertErrorCode(coll,
- [{$out: {db: foreignDB.getName(), to: "view", mode: "replaceDocuments"}}],
+ [{
+ $merge: {
+ into: {db: foreignDB.getName(), coll: "view"},
+ whenMatched: "replaceWithNew",
+ whenNotMatched: "insert"
+ }
+ }],
ErrorCodes.CommandNotSupportedOnView);
// TODO (SERVER-36832): When $out to foreign database is allowed with "replaceCollection", this
// should fail with ErrorCodes.CommandNotSupportedOnView.
diff --git a/jstests/noPassthrough/merge_max_time_ms.js b/jstests/noPassthrough/merge_max_time_ms.js
new file mode 100644
index 00000000000..6cab88f0270
--- /dev/null
+++ b/jstests/noPassthrough/merge_max_time_ms.js
@@ -0,0 +1,264 @@
+/**
+ * Test that an aggregation with a $merge stage obeys the maxTimeMS.
+ * @tags: [requires_sharding, requires_replication]
+ */
+(function() {
+ load("jstests/aggregation/extras/out_helpers.js"); // For withEachMergeMode().
+ load("jstests/libs/fixture_helpers.js"); // For isMongos().
+ load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow.
+
+ const kDBName = "test";
+ const kSourceCollName = "merge_max_time_ms_source";
+ const kDestCollName = "merge_max_time_ms_dest";
+ const nDocs = 10;
+
+ /**
+ * Helper for populating the collection.
+ */
+ function insertDocs(coll) {
+ for (let i = 0; i < nDocs; i++) {
+ assert.commandWorked(coll.insert({_id: i}));
+ }
+ }
+
+ /**
+ * Wait until the server sets its CurOp "msg" to the failpoint name, indicating that it's
+ * hanging.
+ */
+ function waitUntilServerHangsOnFailPoint(conn, fpName) {
+ // Be sure that the server is hanging on the failpoint.
+ assert.soon(function() {
+ const filter = {"msg": fpName};
+ const ops = conn.getDB("admin")
+ .aggregate([{$currentOp: {allUsers: true}}, {$match: filter}])
+ .toArray();
+ return ops.length == 1;
+ });
+ }
+
+ /**
+ * Given a $merge parameters mongod connection, run a $out aggregation against 'conn' which
+ * hangs on the given failpoint and ensure that the $out maxTimeMS expires.
+ */
+ function forceAggregationToHangAndCheckMaxTimeMsExpires(
+ whenMatched, whenNotMatched, conn, failPointName) {
+ // Use a short maxTimeMS so that the test completes in a reasonable amount of time. We will
+ // use the 'maxTimeNeverTimeOut' failpoint to ensure that the operation does not
+ // prematurely time out.
+ const maxTimeMS = 1000 * 2;
+
+ // Enable a failPoint so that the write will hang.
+ let failpointCommand = {
+ configureFailPoint: failPointName,
+ mode: "alwaysOn",
+ data: {nss: kDBName + "." + kDestCollName}
+ };
+
+ assert.commandWorked(conn.getDB("admin").runCommand(failpointCommand));
+
+ // Make sure we don't run out of time before the failpoint is hit.
+ assert.commandWorked(conn.getDB("admin").runCommand(
+ {configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}));
+
+ // Build the parallel shell function.
+ let shellStr = `const sourceColl = db['${kSourceCollName}'];`;
+ shellStr += `const destColl = db['${kDestCollName}'];`;
+ shellStr += `const maxTimeMS = ${maxTimeMS};`;
+ shellStr += `const whenMatched = ${tojson(whenMatched)};`;
+ shellStr += `const whenNotMatched = '${whenNotMatched}';`;
+ const runAggregate = function() {
+ const pipeline = [{
+ $merge: {
+ into: destColl.getName(),
+ whenMatched: whenMatched,
+ whenNotMatched: whenNotMatched
+ }
+ }];
+ const err = assert.throws(() => sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS}));
+ assert.eq(err.code, ErrorCodes.MaxTimeMSExpired, "expected aggregation to fail");
+ };
+ shellStr += `(${runAggregate.toString()})();`;
+ const awaitShell = startParallelShell(shellStr, conn.port);
+
+ waitUntilServerHangsOnFailPoint(conn, failPointName);
+
+ assert.commandWorked(conn.getDB("admin").runCommand(
+ {configureFailPoint: "maxTimeNeverTimeOut", mode: "off"}));
+
+ // The aggregation running in the parallel shell will hang on the failpoint, burning
+ // its time. Wait until the maxTimeMS has definitely expired.
+ sleep(maxTimeMS + 2000);
+
+ // Now drop the failpoint, allowing the aggregation to proceed. It should hit an
+ // interrupt check and terminate immediately.
+ assert.commandWorked(
+ conn.getDB("admin").runCommand({configureFailPoint: failPointName, mode: "off"}));
+
+ // Wait for the parallel shell to finish.
+ assert.eq(awaitShell(), 0);
+ }
+
+ function runUnshardedTest(whenMatched, whenNotMatched, conn) {
+ jsTestLog("Running unsharded test in whenMatched: " + whenMatched + " whenNotMatched: " +
+ whenNotMatched);
+ // The target collection will always be empty so we do not test the setting that will cause
+ // only failure.
+ if (whenNotMatched == "fail") {
+ return;
+ }
+
+ const sourceColl = conn.getDB(kDBName)[kSourceCollName];
+ const destColl = conn.getDB(kDBName)[kDestCollName];
+ assert.commandWorked(destColl.remove({}));
+
+ // Be sure we're able to read from a cursor with a maxTimeMS set on it.
+ (function() {
+ // Use a long maxTimeMS, since we expect the operation to finish.
+ const maxTimeMS = 1000 * 600;
+ const pipeline = [{
+ $merge: {
+ into: destColl.getName(),
+ whenMatched: whenMatched,
+ whenNotMatched: whenNotMatched
+ }
+ }];
+ assert.doesNotThrow(() => sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS}));
+ })();
+
+ assert.commandWorked(destColl.remove({}));
+
+ // Force the aggregation to hang while the batch is being written. The failpoint changes
+ // depending on the mode. If 'whenMatched' is set to "fail" then the implementation will end
+ // up issuing insert commands instead of updates.
+ const kFailPointName =
+ whenMatched == "fail" ? "hangDuringBatchInsert" : "hangDuringBatchUpdate";
+ forceAggregationToHangAndCheckMaxTimeMsExpires(
+ whenMatched, whenNotMatched, conn, kFailPointName);
+
+ assert.commandWorked(destColl.remove({}));
+
+ // Force the aggregation to hang while the batch is being built.
+ forceAggregationToHangAndCheckMaxTimeMsExpires(
+ whenMatched, whenNotMatched, conn, "hangWhileBuildingDocumentSourceMergeBatch");
+ }
+
+ // Run on a standalone.
+ (function() {
+ const conn = MongoRunner.runMongod({});
+ assert.neq(null, conn, 'mongod was unable to start up');
+ insertDocs(conn.getDB(kDBName)[kSourceCollName]);
+ withEachMergeMode(
+ (mode) => runUnshardedTest(mode.whenMatchedMode, mode.whenNotMatchedMode, conn));
+ MongoRunner.stopMongod(conn);
+ })();
+
+ // Runs a $merge against 'mongosConn' and verifies that the maxTimeMS value is included in the
+ // command sent to mongod. Since the actual timeout can unreliably happen in mongos before even
+ // reaching the shard, we instead set a very large timeout and verify that the command sent to
+ // mongod includes the maxTimeMS.
+ function runShardedTest(whenMatched, whenNotMatched, mongosConn, mongodConn, comment) {
+ jsTestLog("Running sharded test in whenMatched: " + whenMatched + " whenNotMatched: " +
+ whenNotMatched);
+ // The target collection will always be empty so we do not test the setting that will cause
+ // only failure.
+ if (whenNotMatched == "fail") {
+ return;
+ }
+
+ // Set a large timeout since we expect the command to finish.
+ const maxTimeMS = 1000 * 20;
+
+ const sourceColl = mongosConn.getDB(kDBName)[kSourceCollName];
+ const destColl = mongosConn.getDB(kDBName)[kDestCollName];
+ assert.commandWorked(destColl.remove({}));
+
+ // Make sure we don't timeout in mongos before even reaching the shards.
+ assert.commandWorked(mongosConn.getDB("admin").runCommand(
+ {configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}));
+
+ const cursor = sourceColl.aggregate([{
+ $merge: {
+ into: destColl.getName(),
+ whenMatched: whenMatched,
+ whenNotMatched: whenNotMatched
+ }
+ }],
+ {maxTimeMS: maxTimeMS, comment: comment});
+ assert(!cursor.hasNext());
+
+ // Filter the profiler entries on the existence of $merge, since aggregations through mongos
+ // will include an extra aggregation with an empty pipeline to establish cursors on the
+ // shards.
+ assert.soon(function() {
+ return mongodConn.getDB(kDBName)
+ .system.profile
+ .find({
+ "command.aggregate": kSourceCollName,
+ "command.pipeline.$merge": {"$exists": true},
+ "command.comment": comment,
+ "command.maxTimeMS": maxTimeMS,
+ })
+ .itcount() == 1;
+ });
+
+ assert.commandWorked(mongosConn.getDB("admin").runCommand(
+ {configureFailPoint: "maxTimeNeverTimeOut", mode: "off"}));
+ }
+
+ // Run on a sharded cluster.
+ (function() {
+ const st = new ShardingTest({shards: 2});
+
+ // Ensure shard 0 is the primary shard. This is so that the $merge stage is guaranteed to
+ // run on it.
+ assert.commandWorked(st.s.getDB("admin").runCommand({enableSharding: kDBName}));
+ st.ensurePrimaryShard(kDBName, st.shard0.name);
+
+ // Set up the source collection to be sharded in a way such that each node will have some
+ // documents for the remainder of the test.
+ // shard 0: [MinKey, 5]
+ // shard 1: [5, MaxKey]
+ st.shardColl(kSourceCollName,
+ {_id: 1}, // key
+ {_id: 5}, // split
+ {_id: 6}, // move
+ kDBName);
+ insertDocs(st.s.getDB(kDBName)[kSourceCollName]);
+
+ // Start the profiler on each shard so that we can examine the $out's maxTimeMS.
+ assert.commandWorked(st.shard0.getDB(kDBName).setProfilingLevel(2));
+ assert.commandWorked(st.shard1.getDB(kDBName).setProfilingLevel(2));
+
+ // // Run the test with 'destColl' unsharded.
+ withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode,
+ mode.whenNotMatchedMode,
+ st.s,
+ st.shard0,
+ mode + "_unshardedDest"));
+
+ // Run the test with 'destColl' sharded. This means that writes will be sent to both
+ // shards, and if either one hangs, the MaxTimeMS will expire.
+ // Shard the destination collection.
+ st.shardColl(kDestCollName,
+ {_id: 1}, // key
+ {_id: 5}, // split
+ {_id: 6}, // move
+ kDBName);
+
+ jsTestLog("Running test forcing shard " + st.shard0.name + " to hang");
+ withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode,
+ mode.whenNotMatchedMode,
+ st.s,
+ st.shard0,
+ mode + "_shardedDest_" + st.shard0.name));
+
+ jsTestLog("Running test forcing shard " + st.shard1.name + " to hang");
+ withEachMergeMode((mode) => runShardedTest(mode.whenMatchedMode,
+ mode.whenNotMatchedMode,
+ st.s,
+ st.shard1,
+ mode + "_shardedDest_" + st.shard1.name));
+
+ st.stop();
+ })();
+})();
diff --git a/jstests/noPassthrough/out_max_time_ms.js b/jstests/noPassthrough/out_max_time_ms.js
index 16c5b129261..578ab60a6e2 100644
--- a/jstests/noPassthrough/out_max_time_ms.js
+++ b/jstests/noPassthrough/out_max_time_ms.js
@@ -3,9 +3,8 @@
* @tags: [requires_sharding, requires_replication]
*/
(function() {
- load("jstests/aggregation/extras/out_helpers.js"); // For withEachOutMode().
- load("jstests/libs/fixture_helpers.js"); // For isMongos().
- load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow.
+ load("jstests/libs/fixture_helpers.js"); // For isMongos().
+ load("jstests/libs/profiler.js"); // For profilerHasSingleMatchingEntryOrThrow.
const kDBName = "test";
const kSourceCollName = "out_max_time_ms_source";
@@ -37,27 +36,21 @@
}
/**
- * Given a $out mode and a mongod connection, run a $out aggregation against 'conn' which hangs
- * on the given failpoint and ensure that the $out maxTimeMS expires.
+ * Given a mongod connection, run a $out aggregation against 'conn' which hangs on the given
+ * failpoint and ensure that the $out maxTimeMS expires.
*/
- function forceAggregationToHangAndCheckMaxTimeMsExpires(mode, conn, failPointName) {
+ function forceAggregationToHangAndCheckMaxTimeMsExpires(conn, failPointName) {
// Use a short maxTimeMS so that the test completes in a reasonable amount of time. We will
- // use the 'maxTimeNeverTimeOut' failpoint to ensure that the operation does not
- // prematurely time out.
+ // use the 'maxTimeNeverTimeOut' failpoint to ensure that the operation does not prematurely
+ // time out.
const maxTimeMS = 1000 * 2;
// Enable a failPoint so that the write will hang.
let failpointCommand = {
configureFailPoint: failPointName,
mode: "alwaysOn",
- data: {nss: kDBName + "." + kDestCollName}
};
- // For mode "replaceCollection", the namespace of the writes will be to a temp namespace so
- // remove the restriction on nss.
- if (mode == "replaceCollection")
- delete failpointCommand.data;
-
assert.commandWorked(conn.getDB("admin").runCommand(failpointCommand));
// Make sure we don't run out of time before the failpoint is hit.
@@ -68,9 +61,8 @@
let shellStr = `const sourceColl = db['${kSourceCollName}'];`;
shellStr += `const destColl = db['${kDestCollName}'];`;
shellStr += `const maxTimeMS = ${maxTimeMS};`;
- shellStr += `const mode = '${mode}';`;
const runAggregate = function() {
- const pipeline = [{$out: {to: destColl.getName(), mode: mode}}];
+ const pipeline = [{$out: destColl.getName()}];
const err = assert.throws(() => sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS}));
assert.eq(err.code, ErrorCodes.MaxTimeMSExpired, "expected aggregation to fail");
};
@@ -95,8 +87,8 @@
assert.eq(awaitShell(), 0);
}
- function runUnshardedTest(mode, conn) {
- jsTestLog("Running test in mode " + mode);
+ function runUnshardedTest(conn) {
+ jsTestLog("Running unsharded test");
const sourceColl = conn.getDB(kDBName)[kSourceCollName];
const destColl = conn.getDB(kDBName)[kDestCollName];
@@ -106,7 +98,7 @@
(function() {
// Use a long maxTimeMS, since we expect the operation to finish.
const maxTimeMS = 1000 * 600;
- const pipeline = [{$out: {to: destColl.getName(), mode: mode}}];
+ const pipeline = [{$out: destColl.getName()}];
const cursor = sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS});
assert(!cursor.hasNext());
assert.eq(destColl.countDocuments({_id: {$exists: true}}), nDocs);
@@ -115,18 +107,14 @@
assert.commandWorked(destColl.remove({}));
// Force the aggregation to hang while the batch is being written.
- const kFailPointName =
- mode == "replaceDocuments" ? "hangDuringBatchUpdate" : "hangDuringBatchInsert";
- forceAggregationToHangAndCheckMaxTimeMsExpires(mode, conn, kFailPointName);
+ const kFailPointName = "hangDuringBatchInsert";
+ forceAggregationToHangAndCheckMaxTimeMsExpires(conn, kFailPointName);
assert.commandWorked(destColl.remove({}));
// Force the aggregation to hang while the batch is being built.
- forceAggregationToHangAndCheckMaxTimeMsExpires(
- mode,
- conn,
- mode == "replaceCollection" ? "hangWhileBuildingDocumentSourceOutBatch"
- : "hangWhileBuildingDocumentSourceMergeBatch");
+ forceAggregationToHangAndCheckMaxTimeMsExpires(conn,
+ "hangWhileBuildingDocumentSourceOutBatch");
}
// Run on a standalone.
@@ -134,98 +122,7 @@
const conn = MongoRunner.runMongod({});
assert.neq(null, conn, 'mongod was unable to start up');
insertDocs(conn.getDB(kDBName)[kSourceCollName]);
- withEachOutMode((mode) => runUnshardedTest(mode, conn));
+ runUnshardedTest(conn);
MongoRunner.stopMongod(conn);
})();
-
- // Runs a $out against 'mongosConn' and verifies that the maxTimeMS value is included in the
- // command sent to mongod. Since the actual timeout can unreliably happen in mongos before even
- // reaching the shard, we instead set a very large timeout and verify that the command sent to
- // mongod includes the maxTimeMS.
- function runShardedTest(mode, mongosConn, mongodConn, comment) {
- jsTestLog("Running sharded test in mode " + mode);
- if (mode == "replaceCollection") {
- return;
- }
-
- // Set a large timeout since we expect the command to finish.
- const maxTimeMS = 1000 * 20;
-
- const sourceColl = mongosConn.getDB(kDBName)[kSourceCollName];
- const destColl = mongosConn.getDB(kDBName)[kDestCollName];
- assert.commandWorked(destColl.remove({}));
-
- // Make sure we don't timeout in mongos before even reaching the shards.
- assert.commandWorked(mongosConn.getDB("admin").runCommand(
- {configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}));
-
- const cursor = sourceColl.aggregate([{$out: {to: destColl.getName(), mode: mode}}],
- {maxTimeMS: maxTimeMS, comment: comment});
- assert(!cursor.hasNext());
-
- // Filter the profiler entries on the existence of $out, since aggregations through mongos
- // will include an extra aggregation with an empty pipeline to establish cursors on the
- // shards.
- assert.soon(function() {
- return mongodConn.getDB(kDBName)
- .system.profile
- .find({
- "command.aggregate": kSourceCollName,
- "command.pipeline.$out": {"$exists": true},
- "command.comment": comment,
- "command.maxTimeMS": maxTimeMS,
- })
- .itcount() == 1;
- });
-
- assert.commandWorked(mongosConn.getDB("admin").runCommand(
- {configureFailPoint: "maxTimeNeverTimeOut", mode: "off"}));
- }
-
- // Run on a sharded cluster.
- (function() {
- const st = new ShardingTest({shards: 2});
-
- // Ensure shard 0 is the primary shard. This is so that the $out stage is guaranteed to
- // run on it.
- assert.commandWorked(st.s.getDB("admin").runCommand({enableSharding: kDBName}));
- st.ensurePrimaryShard(kDBName, st.shard0.name);
-
- // Set up the source collection to be sharded in a way such that each node will have some
- // documents for the remainder of the test.
- // shard 0: [MinKey, 5]
- // shard 1: [5, MaxKey]
- st.shardColl(kSourceCollName,
- {_id: 1}, // key
- {_id: 5}, // split
- {_id: 6}, // move
- kDBName);
- insertDocs(st.s.getDB(kDBName)[kSourceCollName]);
-
- // Start the profiler on each shard so that we can examine the $out's maxTimeMS.
- assert.commandWorked(st.shard0.getDB(kDBName).setProfilingLevel(2));
- assert.commandWorked(st.shard1.getDB(kDBName).setProfilingLevel(2));
-
- // // Run the test with 'destColl' unsharded.
- withEachOutMode((mode) => runShardedTest(mode, st.s, st.shard0, mode + "_unshardedDest"));
-
- // Run the test with 'destColl' sharded. This means that writes will be sent to both
- // shards, and if either one hangs, the MaxTimeMS will expire.
- // Shard the destination collection.
- st.shardColl(kDestCollName,
- {_id: 1}, // key
- {_id: 5}, // split
- {_id: 6}, // move
- kDBName);
-
- jsTestLog("Running test forcing shard " + st.shard0.name + " to hang");
- withEachOutMode((mode) => runShardedTest(
- mode, st.s, st.shard0, mode + "_shardedDest_" + st.shard0.name));
-
- jsTestLog("Running test forcing shard " + st.shard1.name + " to hang");
- withEachOutMode((mode) => runShardedTest(
- mode, st.s, st.shard1, mode + "_shardedDest_" + st.shard1.name));
-
- st.stop();
- })();
})();
diff --git a/jstests/noPassthrough/out_majority_read.js b/jstests/noPassthrough/out_merge_majority_read.js
index e27f98033f8..32237708dc0 100644
--- a/jstests/noPassthrough/out_majority_read.js
+++ b/jstests/noPassthrough/out_merge_majority_read.js
@@ -1,5 +1,5 @@
/**
- * Tests that $out with readConcern majority will only see committed data.
+ * Tests that $out and $merge with readConcern majority will only see committed data.
*
* Each operation is tested on a single node, and (if supported) through mongos on both sharded and
* unsharded collections. Mongos doesn't directly handle readConcern majority, but these tests
@@ -48,7 +48,7 @@
assert.commandWorked(targetReplaceDocsColl.insert({_id: 1, state: 'before'}));
setCommittedSnapshot(makeSnapshot());
- // This insert will not be visible to $out.
+ // This insert will not be visible to $merge.
assert.commandWorked(sourceColl.insert({_id: 2, state: 'before'}));
assert.commandWorked(targetReplaceDocsColl.insert({_id: 2, state: 'before'}));
// Similarly this update will not be visible.
@@ -59,17 +59,17 @@
let res = sourceColl.aggregate([], {readConcern: {level: 'majority'}});
assert.eq(res.itcount(), 1);
- // Run $out with the insertDocuments mode. It will pick only the first document. Also it
- // will not see the update ('after').
+ // Run $merge with whenNotMatched set to "insert". It will pick only the first document.
+ // Also it will not see the update ('after').
res = sourceColl.aggregate(
[
{$match: {state: 'before'}},
- {$project: {state: 'out'}},
+ {$project: {state: 'merge'}},
{
- $out: {
- to: targetColl.getName(),
- db: targetColl.getDB().getName(),
- mode: "insertDocuments"
+ $merge: {
+ into: {db: targetColl.getDB().getName(), coll: targetColl.getName()},
+ whenMatched: "fail",
+ whenNotMatched: "insert"
}
}
],
@@ -77,22 +77,24 @@
assert.eq(res.itcount(), 0);
- // Validate the insertDocuments results.
res = targetColl.find().sort({_id: 1});
- // Only a single document is visible ($out did not see the second insert).
- assert.docEq(res.next(), {_id: 1, state: 'out'});
+ // Only a single document is visible ($merge did not see the second insert).
+ assert.docEq(res.next(), {_id: 1, state: 'merge'});
assert(res.isExhausted());
- // The same $out but in replaceDocuments.
+ // The same $merge but with whenMatched set to "replaceWithNew".
res = sourceColl.aggregate(
[
{$match: {state: 'before'}},
- {$project: {state: 'out'}},
+ {$project: {state: 'merge'}},
{
- $out: {
- to: targetReplaceDocsColl.getName(),
- db: targetReplaceDocsColl.getDB().getName(),
- mode: "replaceDocuments"
+ $merge: {
+ into: {
+ db: targetReplaceDocsColl.getDB().getName(),
+ coll: targetReplaceDocsColl.getName()
+ },
+ whenMatched: "replaceWithNew",
+ whenNotMatched: "insert"
}
}
],
@@ -101,30 +103,29 @@
setCommittedSnapshot(makeSnapshot());
- // Validate the replaceDocuments results.
res = targetReplaceDocsColl.find().sort({_id: 1});
- // The first document must overwrite the update that the read portion of $out did not see.
- assert.docEq(res.next(), {_id: 1, state: 'out'});
- // The second document is the result of the independent insert that $out did not see.
+ // The first document must overwrite the update that the read portion of $merge did not see.
+ assert.docEq(res.next(), {_id: 1, state: 'merge'});
+ // The second document is the result of the independent insert that $merge did not see.
assert.docEq(res.next(), {_id: 2, state: 'before'});
assert(res.isExhausted());
assert.commandWorked(targetColl.remove({}));
setCommittedSnapshot(makeSnapshot());
- // Insert a document that will collide with $out insert. The insert is not majority
+ // Insert a document that will collide with $merge insert. The insert is not majority
// commited.
assert.commandWorked(targetColl.insert({_id: 1, state: 'collision'}));
res = db.runCommand({
aggregate: sourceColl.getName(),
pipeline: [
- {$project: {state: 'out'}},
+ {$project: {state: 'merge'}},
{
- $out: {
- to: targetColl.getName(),
- db: targetColl.getDB().getName(),
- mode: "insertDocuments"
+ $merge: {
+ into: {db: targetColl.getDB().getName(), coll: targetColl.getName()},
+ whenMatched: "fail",
+ whenNotMatched: "insert"
}
}
],
@@ -138,16 +139,16 @@
assert.commandWorked(targetColl.remove({_id: 1}));
assert.commandWorked(targetColl.remove({_id: 2}));
- // $out should successfuly 'overwrite' the collection as it is 'empty' (not majority).
+ // $merge should successfuly 'overwrite' the collection as it is 'empty' (not majority).
res = targetReplaceDocsColl.aggregate(
[
{$match: {state: 'before'}},
- {$project: {state: 'out'}},
+ {$project: {state: 'merge'}},
{
- $out: {
- to: targetColl.getName(),
- db: targetColl.getDB().getName(),
- mode: "insertDocuments"
+ $merge: {
+ into: {db: targetColl.getDB().getName(), coll: targetColl.getName()},
+ whenMatched: "fail",
+ whenNotMatched: "insert"
}
}
],
@@ -157,10 +158,9 @@
setCommittedSnapshot(makeSnapshot());
- // Validate the insertDocuments results.
res = targetColl.find().sort({_id: 1});
- // Only a single document is visible ($out did not see the second insert).
- assert.docEq(res.next(), {_id: 2, state: 'out'});
+ // Only a single document is visible ($merge did not see the second insert).
+ assert.docEq(res.next(), {_id: 2, state: 'merge'});
assert(res.isExhausted());
}
diff --git a/jstests/replsets/linearizable_read_concern.js b/jstests/replsets/linearizable_read_concern.js
index 78867709a8f..b211ebe5f3b 100644
--- a/jstests/replsets/linearizable_read_concern.js
+++ b/jstests/replsets/linearizable_read_concern.js
@@ -78,10 +78,16 @@ load('jstests/libs/write_concern_util.js');
assert.eq(opTimeCmd.code, ErrorCodes.FailedToParse);
// A $out aggregation is not allowed with readConcern level "linearizable".
- let result = assert.throws(
- () => primary.getDB("test").foo.aggregate([{$out: {to: "out", mode: "replaceDocuments"}}],
- {readConcern: {level: "linearizable"}}));
- assert.eq(result.code, ErrorCodes.InvalidOptions);
+ let outResult = assert.throws(() => primary.getDB("test").foo.aggregate(
+ [{$out: "out"}], {readConcern: {level: "linearizable"}}));
+ assert.eq(outResult.code, ErrorCodes.InvalidOptions);
+
+ // A $merge aggregation is not allowed with readConcern level "linearizable".
+ let mergeResult = assert.throws(
+ () => primary.getDB("test").foo.aggregate(
+ [{$merge: {into: "out", whenMatched: "replaceWithNew", whenNotMatched: "insert"}}],
+ {readConcern: {level: "linearizable"}}));
+ assert.eq(mergeResult.code, ErrorCodes.InvalidOptions);
primary = replTest.getPrimary();