summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_lookup_single_shard_cluster.js
blob: 53fed91912558b9629c69b1a58637cbfddf8a07d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// Tests that a $changeStream pipeline is split rather than forwarded even in the case where the
// cluster only has a single shard, and that it can therefore successfully look up a document in a
// sharded collection.
// @tags: [uses_change_streams]
(function() {
"use strict";

// For supportsMajorityReadConcern.
load('jstests/multiVersion/libs/causal_consistency_helpers.js');

// TODO (SERVER-38673): Remove this once BACKPORT-3428, BACKPORT-3429 are completed.
if (!jsTestOptions().enableMajorityReadConcern &&
    jsTestOptions().mongosBinVersion === 'last-stable') {
    jsTestLog(
        "Skipping test since 'last-stable' mongos doesn't support speculative majority update lookup queries.");
    return;
}

// This test only works on storage engines that support committed reads, skip it if the
// configured engine doesn't support it.
if (!supportsMajorityReadConcern()) {
    jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
    return;
}

// Create a cluster with only 1 shard.
const st = new ShardingTest({
    shards: 1,
    rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
});

const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];

// Enable sharding, shard on _id, and insert a test document which will be updated later.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
assert.commandWorked(
    mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
assert.writeOK(mongosColl.insert({_id: 1}));

// Verify that the pipeline splits and merges on mongoS despite only targeting a single shard.
const explainPlan = assert.commandWorked(
    mongosColl.explain().aggregate([{$changeStream: {fullDocument: "updateLookup"}}]));
assert.neq(explainPlan.splitPipeline, null);
assert.eq(explainPlan.mergeType, "mongos");

// Open a $changeStream on the collection with 'updateLookup' and update the test doc.
const stream = mongosColl.watch([], {fullDocument: "updateLookup"});
const wholeDbStream = mongosDB.watch([], {fullDocument: "updateLookup"});

mongosColl.update({_id: 1}, {$set: {updated: true}});

// Verify that the document is successfully retrieved from the single-collection and whole-db
// change streams.
assert.soon(() => stream.hasNext());
assert.docEq(stream.next().fullDocument, {_id: 1, updated: true});

assert.soon(() => wholeDbStream.hasNext());
assert.docEq(wholeDbStream.next().fullDocument, {_id: 1, updated: true});

stream.close();
wholeDbStream.close();

st.stop();
})();