summaryrefslogtreecommitdiff
path: root/jstests/sharding/resharding_change_stream_internal_ops.js
blob: db214f55a0d871b4eb7164cdfa61e7eb91160da4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
// Tests that internal resharding ops are exposed as change stream events during a
// resharding operation. Internal resharding ops include:
// 1. reshardBegin
// 2. reshardDoneCatchUp
//
// @tags: [
//   requires_majority_read_concern,
//   uses_change_streams,
//   requires_fcv_49,
//   uses_atclustertime,
//   disabled_due_to_server_58295
// ]
(function() {
"use strict";

load('jstests/libs/change_stream_util.js');
load("jstests/libs/discover_topology.js");
load("jstests/libs/uuid_util.js");
load("jstests/sharding/libs/resharding_test_fixture.js");

// Use a higher frequency for periodic noops to speed up the test.
const reshardingTest = new ReshardingTest({
    numDonors: 2,
    numRecipients: 1,
    reshardInPlace: false,
    periodicNoopIntervalSecs: 1,
    writePeriodicNoops: true
});
reshardingTest.setup();

const kDbName = "reshardingDb";
const collName = "coll";
const ns = kDbName + "." + collName;

const donorShardNames = reshardingTest.donorShardNames;
const recipientShardNames = reshardingTest.recipientShardNames;
const sourceCollection = reshardingTest.createShardedCollection({
    ns: ns,
    shardKeyPattern: {oldKey: 1},
    chunks: [
        {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
        {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}
    ],
    primaryShardName: donorShardNames[0]
});

const mongos = sourceCollection.getMongo();
const topology = DiscoverTopology.findConnectedNodes(mongos);

const donor0 = new Mongo(topology.shards[donorShardNames[0]].primary);
const cstDonor0 = new ChangeStreamTest(donor0.getDB(kDbName));
let changeStreamsCursorDonor0 = cstDonor0.startWatchingChanges(
    {pipeline: [{$changeStream: {showMigrationEvents: true}}], collection: collName});

const donor1 = new Mongo(topology.shards[donorShardNames[1]].primary);
const cstDonor1 = new ChangeStreamTest(donor1.getDB(kDbName));
let changeStreamsCursorDonor1 = cstDonor1.startWatchingChanges(
    {pipeline: [{$changeStream: {showMigrationEvents: true}}], collection: collName});

const recipient0 = new Mongo(topology.shards[recipientShardNames[0]].primary);
const cstRecipient0 = new ChangeStreamTest(recipient0.getDB(kDbName));

let reshardingUUID;
let changeStreamsCursorRecipient0;

reshardingTest.withReshardingInBackground(
    {
        // If a donor is also a recipient, the donor state machine will run renameCollection with
        // {dropTarget : true} rather than running drop and letting the recipient state machine run
        // rename at the end of the resharding operation. So, we ensure that only one of the donor
        // shards will also be a recipient shard in order to verify that neither the rename with
        // {dropTarget : true} nor the drop command are picked up by the change streams cursor.
        newShardKeyPattern: {newKey: 1},
        newChunks: [
            {min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]},
        ],
    },
    (tempNs) => {
        // Wait until participants are aware of the resharding operation.
        reshardingTest.awaitCloneTimestampChosen();

        const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne();
        reshardingUUID = coordinatorDoc._id;

        changeStreamsCursorRecipient0 = cstRecipient0.startWatchingChanges({
            pipeline: [{$changeStream: {showMigrationEvents: true, allowToRunOnSystemNS: true}}],
            collection: tempNs.substring(tempNs.indexOf('.') + 1)
        });

        // Check for reshardBegin event on both donors.
        const expectedReshardBeginEvent = {
            reshardingUUID: reshardingUUID,
            operationType: "reshardBegin"
        };

        cstDonor0.assertNextChangesEqual(
            {cursor: changeStreamsCursorDonor0, expectedChanges: [expectedReshardBeginEvent]});
        cstDonor1.assertNextChangesEqual(
            {cursor: changeStreamsCursorDonor1, expectedChanges: [expectedReshardBeginEvent]});
    },
    {
        postDecisionPersistedFn: () => {
            // Check for reshardDoneCatchUp event on the recipient.
            const expectedReshardDoneCatchUpEvent = {
                reshardingUUID: reshardingUUID,
                operationType: "reshardDoneCatchUp"
            };

            cstRecipient0.assertNextChangesEqual({
                cursor: changeStreamsCursorRecipient0,
                expectedChanges: [expectedReshardDoneCatchUpEvent]
            });
        }
    });

reshardingTest.teardown();
})();