1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
// Tests that a $changeStream pipeline is split rather than forwarded even in the case where the
// cluster only has a single shard, and that it can therefore successfully look up a document in a
// sharded collection.
// @tags: [
// requires_majority_read_concern,
// uses_change_streams,
// ]
(function() {
"use strict";
// Create a cluster with only 1 shard.
const st = new ShardingTest({
shards: 1,
rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
});
const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];
// Enable sharding, shard on _id, and insert a test document which will be updated later.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
assert.commandWorked(
mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
assert.commandWorked(mongosColl.insert({_id: 1}));
// Verify that the pipeline splits and merges on mongoS despite only targeting a single shard.
const explainPlan = assert.commandWorked(
mongosColl.explain().aggregate([{$changeStream: {fullDocument: "updateLookup"}}]));
assert.neq(explainPlan.splitPipeline, null);
assert.eq(explainPlan.mergeType, "mongos");
// Open a $changeStream on the collection with 'updateLookup' and update the test doc.
const stream = mongosColl.watch([], {fullDocument: "updateLookup"});
const wholeDbStream = mongosDB.watch([], {fullDocument: "updateLookup"});
mongosColl.update({_id: 1}, {$set: {updated: true}});
// Verify that the document is successfully retrieved from the single-collection and whole-db
// change streams.
assert.soon(() => stream.hasNext());
assert.docEq(stream.next().fullDocument, {_id: 1, updated: true});
assert.soon(() => wholeDbStream.hasNext());
assert.docEq(wholeDbStream.next().fullDocument, {_id: 1, updated: true});
stream.close();
wholeDbStream.close();
st.stop();
})();
|