summaryrefslogtreecommitdiff
path: root/jstests/sharding/change_stream_lookup_single_shard_cluster.js
blob: 4fe6eda48583ca36338da08017221c415d593216 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// Tests that a $changeStream pipeline is split rather than forwarded even in the case where the
// cluster only has a single shard, and that it can therefore successfully look up a document in a
// sharded collection.
// @tags: [uses_change_streams, requires_majority_read_concern]
(function() {
"use strict";

// Create a cluster with only 1 shard.
const st = new ShardingTest({
    shards: 1,
    rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
});

const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];

// Enable sharding, shard on _id, and insert a test document which will be updated later.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
assert.commandWorked(
    mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
assert.commandWorked(mongosColl.insert({_id: 1}));

// Verify that the pipeline splits and merges on mongoS despite only targeting a single shard.
const explainPlan = assert.commandWorked(
    mongosColl.explain().aggregate([{$changeStream: {fullDocument: "updateLookup"}}]));
assert.neq(explainPlan.splitPipeline, null);
assert.eq(explainPlan.mergeType, "mongos");

// Open a $changeStream on the collection with 'updateLookup' and update the test doc.
const stream = mongosColl.watch([], {fullDocument: "updateLookup"});
const wholeDbStream = mongosDB.watch([], {fullDocument: "updateLookup"});

mongosColl.update({_id: 1}, {$set: {updated: true}});

// Verify that the document is successfully retrieved from the single-collection and whole-db
// change streams.
assert.soon(() => stream.hasNext());
assert.docEq(stream.next().fullDocument, {_id: 1, updated: true});

assert.soon(() => wholeDbStream.hasNext());
assert.docEq(wholeDbStream.next().fullDocument, {_id: 1, updated: true});

stream.close();
wholeDbStream.close();

st.stop();
})();