summaryrefslogtreecommitdiff
path: root/jstests/change_streams/migrate_last_chunk_from_shard_event.js
blob: 0ff793b6d957984fb973a4bcca9c2d66b3c63941 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/**
 * Test that change streams returns migrateLastChunkFromShard events.
 *
 *  @tags: [
 *    requires_fcv_60,
 *    requires_sharding,
 *    uses_change_streams,
 *    change_stream_does_not_expect_txns,
 *    assumes_unsharded_collection,
 *    assumes_read_preference_unchanged,
 * ]
 */

(function() {
"use strict";

load("jstests/libs/collection_drop_recreate.js");  // For assertDropCollection.
load('jstests/libs/change_stream_util.js');        // For 'ChangeStreamTest' and
                                                   // 'assertChangeStreamEventEq'.
const dbName = jsTestName();
const collName = "test";
const collNS = dbName + "." + collName;
const ns = {
    db: dbName,
    coll: collName
};
const numDocs = 1;

const st = new ShardingTest({
    shards: 2,
    rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}}
});

const mongosConn = st.s;
const db = mongosConn.getDB(dbName);
const test = new ChangeStreamTest(db);

function getCollectionUuid(coll) {
    const collInfo = db.getCollectionInfos({name: coll})[0];
    return collInfo.info.uuid;
}

function assertMigrateEventObserved(cursor, expectedEvent) {
    let events = test.getNextChanges(cursor, 1);
    let event = events[0];
    // Check the presence and the type of 'wallTime' field. We have no way to check the correctness
    // of 'wallTime' value, so we delete it afterwards.
    assert(event.wallTime instanceof Date);
    delete event.wallTime;
    expectedEvent.collectionUUID = getCollectionUuid(collName);
    assertChangeStreamEventEq(event, expectedEvent);
    return event._id;
}

function prepareCollection() {
    assertDropCollection(db, collName);
    assert.commandWorked(db[collName].insert({_id: 1}));
    assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {_id: 1}}));
}

// Test that if showSystemEvents is false, we do not see the migrateLastChunkFromShard event.
function validateShowSystemEventsFalse() {
    prepareCollection();
    let pipeline = [
        {$changeStream: {showExpandedEvents: true, showSystemEvents: false}},
        {$match: {operationType: {$nin: ["create", "createIndexes"]}}}
    ];
    let cursor = test.startWatchingChanges(
        {pipeline, collection: collName, aggregateOptions: {cursor: {batchSize: 0}}});

    // Migrate a chunk, then insert a new document.
    assert.commandWorked(
        db.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName}));
    assert.commandWorked(db[collName].insert({_id: numDocs + 1}));

    // Confirm that we don't observe the migrateLastChunkFromShard event in the stream, but only see
    // the subsequent insert.
    test.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: {
            operationType: "insert",
            ns: ns,
            fullDocument: {_id: numDocs + 1},
            documentKey: {_id: numDocs + 1},
        }
    });
}

// Test that if showSystemEvents is true, we see the migrateLastChunkFromShard event and can resume
// after it.
function validateExpectedEventAndConfirmResumability(collParam, expectedOutput) {
    prepareCollection();

    let pipeline = [
        {$changeStream: {showExpandedEvents: true, showSystemEvents: true}},
        {$match: {operationType: {$nin: ["create", "createIndexes"]}}}
    ];
    let cursor = test.startWatchingChanges(
        {pipeline: pipeline, collection: collParam, aggregateOptions: {cursor: {batchSize: 0}}});

    // Migrate a chunk from one shard to another.
    assert.commandWorked(
        db.adminCommand({moveChunk: collNS, find: {_id: 0}, to: st.shard1.shardName}));

    // Confirm that we observe the migrateLastChunkFromShard event, and obtain its resume token.
    const migrateResumeToken = assertMigrateEventObserved(cursor, expectedOutput);

    // Insert a document before starting the next change stream so that we can validate the
    // resuming behavior.
    assert.commandWorked(db[collName].insert({_id: numDocs + 1}));

    // Resume after the migrate event and confirm we see the subsequent insert.
    pipeline = [{
        $changeStream:
            {showExpandedEvents: true, showSystemEvents: true, resumeAfter: migrateResumeToken}
    }];
    cursor = test.startWatchingChanges({pipeline: pipeline, collection: collParam});

    test.assertNextChangesEqual({
        cursor: cursor,
        expectedChanges: {
            operationType: "insert",
            ns: ns,
            fullDocument: {_id: numDocs + 1},
            documentKey: {_id: numDocs + 1},
        }
    });
}

assert.commandWorked(db.adminCommand({enableSharding: dbName}));
assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName}));

// Test the behaviour of migrateLastChunkFromShard for a single-collection stream
validateExpectedEventAndConfirmResumability(collName, {
    operationType: "migrateLastChunkFromShard",
    ns: ns,
    operationDescription: {
        "shardId": st.shard0.shardName,
    }
});

// Test the behaviour of migrateLastChunkFromShard for a whole-DB stream.
validateExpectedEventAndConfirmResumability(1, {
    operationType: "migrateLastChunkFromShard",
    ns: ns,
    operationDescription: {
        "shardId": st.shard0.shardName,
    }
});

// Test the behaviour of migrateLastChunkFromShard when showSystemEvents is false.
validateShowSystemEventsFalse();

st.stop();
}());