summaryrefslogtreecommitdiff
path: root/jstests/concurrency
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2020-02-19 14:05:09 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-16 18:54:35 +0000
commit54488c22e2ce672a8bdbb2dac68941b958e69b5c (patch)
tree47a94d595273258d224611499a8c416b511aa220 /jstests/concurrency
parent9220961e1a853d034b862cb5a83d60813d17394c (diff)
downloadmongo-54488c22e2ce672a8bdbb2dac68941b958e69b5c.tar.gz
SERVER-45541 Test interrupting $unionWith.
Diffstat (limited to 'jstests/concurrency')
-rw-r--r--jstests/concurrency/fsm_workloads/agg_out_interrupt_cleanup.js32
-rw-r--r--jstests/concurrency/fsm_workloads/agg_unionWith_interrupt_cleanup.js112
2 files changed, 133 insertions, 11 deletions
diff --git a/jstests/concurrency/fsm_workloads/agg_out_interrupt_cleanup.js b/jstests/concurrency/fsm_workloads/agg_out_interrupt_cleanup.js
index bc6c9b48515..d6bdebb5d57 100644
--- a/jstests/concurrency/fsm_workloads/agg_out_interrupt_cleanup.js
+++ b/jstests/concurrency/fsm_workloads/agg_out_interrupt_cleanup.js
@@ -1,6 +1,12 @@
/**
* Tests $out stage of aggregate command concurrently with killOp. Ensures that all the temporary
- * collections created during aggreate command are deleted.
+ * collections created during aggreate command are deleted. If extending this workload, consider
+ * overriding the following:
+ * - $config.states.aggregate: The function to execute the aggregation.
+ * - $config.states.killOp: The function to find the aggregation and kill it. Consider reusing
+ * $config.data.killOpsMatchingFilter to do the deed.
+ * - $config.teardown: If you want any assertion to make sure nothing got leaked or left behind by
+ * the interrupted aggregation.
*
* @tags: [uses_curop_agg_stage]
*/
@@ -15,11 +21,23 @@ var $config = extendWorkload($config, function($config, $super) {
{aggregate: collName, pipeline: [{$out: "interrupt_temp_out"}], cursor: {}});
};
+ // This test sets up aggregations just to tear them down. There's no benefit to using large
+ // documents here, and doing so can increase memory pressure on the test host, so we lower it
+ // down to 1KB.
+ $config.data.docSize = 1024;
+ $config.data.killOpsMatchingFilter = function killOpsMatchingFilter(db, filter) {
+ const currentOpOutput =
+ db.getSiblingDB('admin').aggregate([{$currentOp: {}}, {$match: filter}]).toArray();
+ for (let op of currentOpOutput) {
+ assert(op.hasOwnProperty('opid'));
+ assertAlways.commandWorked(db.getSiblingDB('admin').killOp(op.opid));
+ }
+ };
$config.states.killOp = function killOp(db, collName) {
// The aggregate command could be running different commands internally (renameCollection,
// insertDocument, etc.) depending on which stage of execution it is in. So, get all the
// operations that are running against the input, output or temp collections.
- const activeCurOpsFilter = {
+ this.killOpsMatchingFilter(db, {
op: "command",
active: true,
$or: [
@@ -31,15 +49,7 @@ var $config = extendWorkload($config, function($config, $super) {
$exists: false
} // Exclude 'drop' command from the filter to make sure that we don't kill the the
// drop command which is responsible for dropping the temporary collection.
- };
-
- const currentOpOutput = db.getSiblingDB('admin')
- .aggregate([{$currentOp: {}}, {$match: activeCurOpsFilter}])
- .toArray();
- for (let op of currentOpOutput) {
- assert(op.hasOwnProperty('opid'));
- assertAlways.commandWorked(db.getSiblingDB('admin').killOp(op.opid));
- }
+ });
};
$config.teardown = function teardown(db, collName, cluster) {
diff --git a/jstests/concurrency/fsm_workloads/agg_unionWith_interrupt_cleanup.js b/jstests/concurrency/fsm_workloads/agg_unionWith_interrupt_cleanup.js
new file mode 100644
index 00000000000..912e26e80c0
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/agg_unionWith_interrupt_cleanup.js
@@ -0,0 +1,112 @@
+/**
+ * Tests $unionWith stage of aggregate command concurrently with killOp. Ensures that all cursors
+ * opened on behalf of the $unionWith are killed when interrupted.
+ *
+ * @tags: [
+ * uses_curop_agg_stage,
+ * requires_fcv_44, # Uses $unionWith
+ * ]
+ */
+'use strict';
+load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload
+load('jstests/concurrency/fsm_workloads/agg_out_interrupt_cleanup.js'); // for $config
+
+var $config = extendWorkload($config, function($config, $super) {
+ $config.data.commentStr = "agg_unionWith_interrupt_cleanup";
+
+ $config.states.aggregate = function aggregate(db, collName) {
+ // Here we consistenly union with the same namespace to benefit from the sharded collection
+ // setup that may have been done in sharded passthroughs.
+ // TODO SERVER-46251 use multiple namespaces.
+ let response = db[collName].runCommand({
+ aggregate: collName,
+ pipeline: [{$unionWith: {coll: collName, pipeline: [{$unionWith: collName}]}}],
+ comment: this.commentStr,
+ // Use a small batch size to ensure these operations open up a cursor and use multiple
+ // getMores. We want to give coverage to interrupting the getMores as well.
+ cursor: {batchSize: this.numDocs / 4}
+ });
+ // Keep iterating the cursor until we exhaust it or we are interrupted.
+ while (response.ok && response.cursor.id != 0) {
+ response = db[collName].runCommand({getMore: response.cursor.id, collection: collName});
+ }
+ if (!response.ok) {
+ // If the interrupt happens just as the cursor is being checked back in, the cursor will
+ // be killed without failing the operation. When this happens, the next getMore will
+ // fail with CursorNotFound.
+ assertWhenOwnColl.contains(
+ response.code, [ErrorCodes.Interrupted, ErrorCodes.CursorNotFound], response);
+ }
+ };
+
+ $config.states.killOp = function killOp(db, collName) {
+ // The aggregate command could be running different sub-aggregates internally depending on
+ // which stage of execution it is in. So we rely on the comment to detect which operations
+ // are eligible to be interrupted, and interrupt those.
+ this.killOpsMatchingFilter(db, {
+ $and: [
+ {active: true},
+ {
+ $or: [
+ {"command.comment": this.commentStr},
+ {"cursor.originatingCommand.comment": this.commentStr},
+ ]
+ }
+ ]
+ });
+ };
+
+ $config.teardown = function teardown(db, collName, cluster) {
+ // Ensure that no operations, cursors, or sub-operations are left active. After
+ // SERVER-46255, We normally expect all operations to be cleaned up safely, but there are
+ // race conditions or possible network blips where the kill won't arrive as expected. We
+ // don't want to block the interrupt thread or the operation itself to wait around to make
+ // sure everything dies correctly, so we just rely on cursor timeouts or session reaps to
+ // cover these rare cases. Here we make sure everything is cleaned up so we avoid hogging
+ // resources for future tests.
+ this.killOpsMatchingFilter(db, {
+ $and: [
+ {active: true},
+ {
+ $or: [
+ {"command.comment": this.commentStr},
+ {"cursor.originatingCommand.comment": this.commentStr},
+ ]
+ }
+ ]
+ });
+ const curOpCursor = db.getSiblingDB("admin").aggregate([
+ {$currentOp: {idleCursors: true}},
+ {$match: {"cursor.originatingCommand.comment": this.commentStr}},
+ {$project: {shard: 1, host: 1, "cursor.cursorId": 1}},
+ ]);
+ while (curOpCursor.hasNext()) {
+ let result = curOpCursor.next();
+ assertAlways.commandWorked(
+ new Mongo(`${result.shard}/${result.host}`).getDB(db.getName()).runCommand({
+ killCursors: collName,
+ cursors: [result.cursor.cursorId]
+ }));
+ }
+ const remainingOps =
+ db.getSiblingDB("admin")
+ .aggregate([
+ {$currentOp: {idleCursors: true}},
+ // Look for any trace of state that wasn't cleaned up.
+ {
+ $match: {
+ $or: [
+ // The originating aggregation or a sub-aggregation still active.
+ {"command.comment": this.commentStr},
+ // An idle cursor left around.
+ {"cursor.originatingCommand.comment": this.commentStr}
+ ]
+ }
+ }
+ ])
+ .toArray();
+ assertAlways.eq(remainingOps.length, 0, remainingOps);
+ };
+
+ return $config;
+});