summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/out_max_time_ms.js
blob: 29c00b6834f38e4781ae8e5481caba6e862c7147 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/**
 * Test that an aggregation with a $out stage obeys the maxTimeMS.
 * @tags: [requires_sharding, requires_replication]
 */
(function() {
load("jstests/libs/fixture_helpers.js");  // For isMongos().
load("jstests/libs/profiler.js");         // For profilerHasSingleMatchingEntryOrThrow.

const kDBName = "test";
const kSourceCollName = "out_max_time_ms_source";
const kDestCollName = "out_max_time_ms_dest";
const nDocs = 10;

/**
 * Helper for populating the collection.
 */
function insertDocs(coll) {
    for (let i = 0; i < nDocs; i++) {
        assert.commandWorked(coll.insert({_id: i}));
    }
}

/**
 * Wait until the server sets its CurOp "msg" to the failpoint name, indicating that it's
 * hanging.
 */
function waitUntilServerHangsOnFailPoint(conn, fpName) {
    // Be sure that the server is hanging on the failpoint.
    assert.soon(function() {
        const filter = {"msg": fpName};
        const ops = conn.getDB("admin")
                        .aggregate([{$currentOp: {allUsers: true}}, {$match: filter}])
                        .toArray();
        return ops.length == 1;
    });
}

/**
 * Given a mongod connection, run a $out aggregation against 'conn' which hangs on the given
 * failpoint and ensure that the $out maxTimeMS expires.
 */
function forceAggregationToHangAndCheckMaxTimeMsExpires(conn, failPointName) {
    // Use a short maxTimeMS so that the test completes in a reasonable amount of time. We will
    // use the 'maxTimeNeverTimeOut' failpoint to ensure that the operation does not prematurely
    // time out.
    const maxTimeMS = 1000 * 2;

    // Enable a failPoint so that the write will hang.
    let failpointCommand = {
        configureFailPoint: failPointName,
        mode: "alwaysOn",
    };

    assert.commandWorked(conn.getDB("admin").runCommand(failpointCommand));

    // Make sure we don't run out of time before the failpoint is hit.
    assert.commandWorked(conn.getDB("admin").runCommand(
        {configureFailPoint: "maxTimeNeverTimeOut", mode: "alwaysOn"}));

    // Build the parallel shell function.
    let shellStr = `const sourceColl = db['${kSourceCollName}'];`;
    shellStr += `const destColl = db['${kDestCollName}'];`;
    shellStr += `const maxTimeMS = ${maxTimeMS};`;
    const runAggregate = function() {
        const pipeline = [{$out: destColl.getName()}];
        const err = assert.throws(() => sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS}));
        assert.eq(err.code, ErrorCodes.MaxTimeMSExpired, "expected aggregation to fail");
    };
    shellStr += `(${runAggregate.toString()})();`;
    const awaitShell = startParallelShell(shellStr, conn.port);

    waitUntilServerHangsOnFailPoint(conn, failPointName);

    assert.commandWorked(
        conn.getDB("admin").runCommand({configureFailPoint: "maxTimeNeverTimeOut", mode: "off"}));

    // The aggregation running in the parallel shell will hang on the failpoint, burning
    // its time. Wait until the maxTimeMS has definitely expired.
    sleep(maxTimeMS + 2000);

    // Now drop the failpoint, allowing the aggregation to proceed. It should hit an
    // interrupt check and terminate immediately.
    assert.commandWorked(
        conn.getDB("admin").runCommand({configureFailPoint: failPointName, mode: "off"}));

    // Wait for the parallel shell to finish.
    assert.eq(awaitShell(), 0);
}

function runUnshardedTest(conn) {
    jsTestLog("Running unsharded test");

    const sourceColl = conn.getDB(kDBName)[kSourceCollName];
    const destColl = conn.getDB(kDBName)[kDestCollName];
    assert.commandWorked(destColl.remove({}));

    // Be sure we're able to read from a cursor with a maxTimeMS set on it.
    (function() {
        // Use a long maxTimeMS, since we expect the operation to finish.
        const maxTimeMS = 1000 * 600;
        const pipeline = [{$out: destColl.getName()}];
        const cursor = sourceColl.aggregate(pipeline, {maxTimeMS: maxTimeMS});
        assert(!cursor.hasNext());
        assert.eq(destColl.countDocuments({_id: {$exists: true}}), nDocs);
    })();

    assert.commandWorked(destColl.remove({}));

    // Force the aggregation to hang while the batch is being written.
    const kFailPointName = "hangDuringBatchInsert";
    forceAggregationToHangAndCheckMaxTimeMsExpires(conn, kFailPointName);

    assert.commandWorked(destColl.remove({}));

    // Force the aggregation to hang while the batch is being built.
    forceAggregationToHangAndCheckMaxTimeMsExpires(conn, "hangWhileBuildingDocumentSourceOutBatch");
}

// Run on a standalone.
(function() {
const conn = MongoRunner.runMongod({});
assert.neq(null, conn, 'mongod was unable to start up');
insertDocs(conn.getDB(kDBName)[kSourceCollName]);
runUnshardedTest(conn);
MongoRunner.stopMongod(conn);
})();
})();