summaryrefslogtreecommitdiff
path: root/jstests/sharding/causal_consistency_shell_support.js
blob: cb3e02e119c57ff684848856eea154fddee5aca2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/**
 * Tests which commands support causal consistency in the Mongo shell, that for each supported
 * command, the shell properly forwards its operation and cluster time and updates them based on the
 * response, and that the server rejects commands with afterClusterTime ahead of cluster time.
 *
 * @tags: [requires_majority_read_concern]
 */
(function() {
"use strict";

// Verifies the command works and properly updates operation or cluster time.
function runCommandAndCheckLogicalTimes(cmdObj, db, shouldAdvance) {
    const session = db.getSession();

    // Extract initial operation and cluster time.
    let operationTime = session.getOperationTime();
    let clusterTimeObj = session.getClusterTime();

    assert.commandWorked(db.runCommand(cmdObj));

    // Verify cluster and operation time.
    if (shouldAdvance) {
        assert(bsonWoCompare(session.getOperationTime(), operationTime) > 0,
               "expected the shell's operationTime to increase after running command: " +
                   tojson(cmdObj));
        assert(bsonWoCompare(session.getClusterTime().clusterTime, clusterTimeObj.clusterTime) > 0,
               "expected the shell's clusterTime value to increase after running command: " +
                   tojson(cmdObj));
    } else {
        // Don't expect either clusterTime or operationTime to not change, because they may be
        // incremented by unrelated activity in the cluster.
    }
}

// Verifies the command works and correctly updates the shell's operationTime.
function commandWorksAndUpdatesOperationTime(cmdObj, db) {
    const session = db.getSession();

    // Use the latest cluster time returned as a new operationTime and run command.
    const clusterTimeObj = session.getClusterTime();
    session.advanceOperationTime(clusterTimeObj.clusterTime);
    assert.commandWorked(testDB.runCommand(cmdObj));

    // Verify the response contents and that new operation time is >= passed in time.
    assert(bsonWoCompare(session.getOperationTime(), clusterTimeObj.clusterTime) >= 0,
           "expected the shell's operationTime to be >= to:" + tojson(clusterTimeObj.clusterTime) +
               " after running command: " + tojson(cmdObj));
}

// Manually create a shard so tests on storage engines that don't support majority readConcern
// can exit early.
const rsName = "causal_consistency_shell_support_rs";
const rst = new ReplSetTest({
    nodes: 1,
    name: rsName,
    nodeOptions: {
        enableMajorityReadConcern: "",
        shardsvr: "",
    }
});

rst.startSet();
rst.initiate();

// Start the sharding test and add the majority readConcern enabled replica set.
const name = "causal_consistency_shell_support";
const st = new ShardingTest({name: name, shards: 1, manualAddShard: true});
assert.commandWorked(st.s.adminCommand({addShard: rst.getURL()}));

const testDB = st.s.getDB("test");
const session = testDB.getSession();

// Verify causal consistency is disabled unless explicitly set.
assert.eq(testDB.getMongo()._causalConsistency,
          false,
          "causal consistency should be disabled by default");
testDB.getMongo().setCausalConsistency(true);

// Verify causal consistency is enabled for the connection and for each supported command.
assert.eq(testDB.getMongo()._causalConsistency,
          true,
          "calling setCausalConsistency() didn't enable causal consistency");

// Verify cluster times are tracked even before causal consistency is set (so the first
// operation with causal consistency set can use valid cluster times).
session.resetOperationTime_forTesting();

assert.commandWorked(testDB.runCommand({insert: "foo", documents: [{x: 1}]}));
assert.neq(session.getOperationTime(), null);
assert.neq(session.getClusterTime(), null);

session.resetOperationTime_forTesting();

assert.commandWorked(testDB.runCommand({find: "foo"}));
assert.neq(session.getOperationTime(), null);
assert.neq(session.getClusterTime(), null);

// Test that write commands advance both operation and cluster time.
runCommandAndCheckLogicalTimes({insert: "foo", documents: [{x: 2}]}, testDB, true);
runCommandAndCheckLogicalTimes(
    {update: "foo", updates: [{q: {x: 2}, u: {$set: {x: 3}}}]}, testDB, true);

// Test that each supported command works as expected and the shell's cluster times are properly
// forwarded to the server and updated based on the response.
testDB.getMongo().setCausalConsistency(true);

// Aggregate command.
let aggColl = "aggColl";
let aggCmd = {aggregate: aggColl, pipeline: [{$match: {x: 1}}], cursor: {}};

runCommandAndCheckLogicalTimes({insert: aggColl, documents: [{_id: 1, x: 1}]}, testDB, true);
runCommandAndCheckLogicalTimes(aggCmd, testDB, false);
commandWorksAndUpdatesOperationTime(aggCmd, testDB);

// Count command.
let countColl = "countColl";
let countCmd = {count: countColl};

runCommandAndCheckLogicalTimes({insert: countColl, documents: [{_id: 1, x: 1}]}, testDB, true);
runCommandAndCheckLogicalTimes(countCmd, testDB, false);
commandWorksAndUpdatesOperationTime(countCmd, testDB);

// Distinct command.
let distinctColl = "distinctColl";
let distinctCmd = {distinct: distinctColl, key: "x"};

runCommandAndCheckLogicalTimes({insert: distinctColl, documents: [{_id: 1, x: 1}]}, testDB, true);
runCommandAndCheckLogicalTimes(distinctCmd, testDB, false);
commandWorksAndUpdatesOperationTime(distinctCmd, testDB);

// Find command.
let findColl = "findColl";
let findCmd = {find: findColl};

runCommandAndCheckLogicalTimes({insert: findColl, documents: [{_id: 1, x: 1}]}, testDB, true);
runCommandAndCheckLogicalTimes(findCmd, testDB, false);
commandWorksAndUpdatesOperationTime(findCmd, testDB);

// Aggregate command with $geoNear.
let geoNearColl = "geoNearColl";
let geoNearCmd = {
    aggregate: geoNearColl,
    cursor: {},
    pipeline: [
        {
            $geoNear: {
                near: {type: "Point", coordinates: [-10, 10]},
                distanceField: "dist",
                spherical: true
            }
        },
    ],
};

assert.commandWorked(testDB[geoNearColl].createIndex({loc: "2dsphere"}));
runCommandAndCheckLogicalTimes(
    {insert: geoNearColl, documents: [{_id: 1, loc: {type: "Point", coordinates: [-10, 10]}}]},
    testDB,
    true);
runCommandAndCheckLogicalTimes(geoNearCmd, testDB, false);
commandWorksAndUpdatesOperationTime(geoNearCmd, testDB);

// MapReduce doesn't currently support read concern majority.

// Verify that the server rejects commands when operation time is invalid by running a command
// with an afterClusterTime value one day ahead.
const invalidTime = new Timestamp(session.getOperationTime().getTime() + (60 * 60 * 24), 0);
const invalidCmd = {
    find: "foo",
    readConcern: {level: "majority", afterClusterTime: invalidTime}
};
assert.commandFailedWithCode(testDB.runCommand(invalidCmd),
                             ErrorCodes.InvalidOptions,
                             "expected command, " + tojson(invalidCmd) + ", to fail with code, " +
                                 ErrorCodes.InvalidOptions +
                                 ", because the afterClusterTime value, " + tojson(invalidTime) +
                                 ", should not be ahead of the clusterTime, " +
                                 tojson(session.getClusterTime().clusterTime));

rst.stopSet();
st.stop();
})();