1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
/**
* Tests that an updateLookup change stream doesn't throw ChangeStreamFatalError after
* fixing SERVER-44598
*
* Remove requires_fcv_44 tag if SERVER-44598 is backported or 4.4 becomes last stable
* @tags: [uses_change_streams, requires_fcv_44]
*/
(function() {
"use strict";
// The UUID consistency check can hit NotMasterNoSlaveOk when it attempts to obtain a list of
// collections from the shard Primaries through mongoS at the end of this test.
TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
// Start a new sharded cluster with 2 nodes and obtain references to the test DB and collection.
const st = new ShardingTest({
shards: 2,
mongos: 1,
rs: {nodes: 3, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
});
const mongosDB = st.s.getDB(jsTestName());
const mongosColl = mongosDB.test;
const shard0 = st.rs0;
// Enable sharding on the the test database and ensure that the primary is shard0.
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
st.ensurePrimaryShard(mongosDB.getName(), shard0.getURL());
// Shard the source collection on {a: 1}, split across the shards at {a: 0}.
st.shardColl(mongosColl, {a: 1}, {a: 0}, {a: 1});
// Open a change stream on the collection.
const csCursor = mongosColl.watch();
// Write one document onto shard0 and obtain its resume token.
assert.commandWorked(mongosColl.insert({_id: 0, a: -100}));
assert.soon(() => csCursor.hasNext());
const resumeToken = csCursor.next()._id;
// get any secondary
const newPrimary = st.rs0.getSecondary();
let shards = st.s.getDB('config').shards.find().toArray();
// Step up one of the Secondaries, which will not have any sharding metadata loaded.
st.rs0.stepUpNoAwaitReplication(newPrimary);
// make sure the mongos refreshes it's connections to the shard
let primary = {};
do {
let connPoolStats = st.s0.adminCommand({connPoolStats: 1});
primary = connPoolStats.replicaSets[shards[0]._id].hosts.find((host) => {
return host.ismaster;
}) ||
{};
} while (newPrimary.host !== primary.addr);
// Do a {multi:true} update. This will scatter to all shards and update the document on shard0.
// Because no metadata is loaded, the shard will return a StaleShardVersion and fetch it,
// the operation will be retried
assert.soonNoExcept(
() => assert.commandWorked(mongosColl.update({_id: 0}, {$set: {updated: true}}, false, true)));
// Resume the change stream with {fullDocument: 'updateLookup'}.
const cmdRes = assert.commandWorked(mongosColl.runCommand("aggregate", {
pipeline: [{$changeStream: {resumeAfter: resumeToken, fullDocument: "updateLookup"}}],
cursor: {}
}));
assert.soon(() => csCursor.hasNext());
const updateObj = csCursor.next();
assert.eq(true, updateObj.updateDescription.updatedFields.updated);
st.stop();
})();
|