summaryrefslogtreecommitdiff
path: root/jstests/sharding/timeseries_change_stream_no_orphans.js
blob: 57c0cc7aee2a8a14dc598b2cdf80cc272e4356f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/**
 * Verifies that delete on orphaned buckets (1) do not show up unexpected events in change streams
 * and (2) have a certain behavior on the persisted data.
 *
 * The behavior is tested in the following scenarios:
 *   - Test case 1: Direct operations to shard on orphaned documents
 *   - Test case 2: Broadcasted operations (from router) on orphaned documents
 *
 * @tags: [
 *   # To avoid multiversion tests
 *   requires_fcv_70,
 *   # To avoid burn-in tests in in-memory build variants
 *   requires_persistence,
 * ]
 */
(function() {
'use strict';

load('jstests/libs/fail_point_util.js');  // For configureFailPoint

// Asserts that there is no change stream event.
function assertNoChanges(csCursor) {
    function advanceAndGetToken() {
        assert(!csCursor.hasNext(), () => csCursor.next());
        return csCursor.getResumeToken();
    }
    const startToken = advanceAndGetToken();
    assert.soon(() => {
        const currentToken = advanceAndGetToken();
        return bsonWoCompare(currentToken, startToken) > 0;
    });
}

const dbName = "ts_change_stream_no_orphans";
const collName = 'ts';
const sysCollName = 'system.buckets.' + collName;
const collNS = dbName + '.' + collName;
const sysCollNS = dbName + '.' + sysCollName;
const timeseriesOpt = {
    timeField: "time",
    metaField: "tag",
};

function validateBucketsCollectionSharded({shardKey}) {
    const configColls = st.s.getDB('config').collections;
    const output = configColls
                       .find({
                           _id: sysCollNS,
                           key: shardKey,
                           timeseriesFields: {$exists: true},
                       })
                       .toArray();
    assert.eq(output.length, 1, configColls.find().toArray());
    assert.eq(output[0].timeseriesFields.timeField, timeseriesOpt.timeField, output[0]);
    assert.eq(output[0].timeseriesFields.metaField, timeseriesOpt.metaField, output[0]);
}

// Enables explicitly the periodic no-op writer to allow the router to process change stream events
// coming from all shards. This is enabled for production clusters by default.
const st = new ShardingTest({
    mongos: 1,
    config: 1,
    shards: 2,
    rs: {nodes: 1, setParameter: {writePeriodicNoops: true, periodicNoopIntervalSecs: 1}},
    other: {enableBalancer: false}
});

// Suspends the range deletion on the first shard to force the orphaned buckets to stay at the
// first shard after the chunks have been moved to the second shard.
let suspendRangeDeletionShard0 = configureFailPoint(st.shard0, 'suspendRangeDeletion');

// Creates a shard collection with buckets having both a key field and a non-key field. The key
// is the metaField of the timeseries collection.
jsTest.log(`Shard a timeseries collection: ${collNS} with shard key: {tag: 1}`);
assert.commandWorked(
    st.s.adminCommand({enableSharding: dbName, primaryShard: st.shard0.shardName}));
const db = st.s.getDB(dbName);
db[collName].drop();
assert.commandWorked(db.createCollection(collName, {timeseries: timeseriesOpt}));
assert.commandWorked(st.s.adminCommand({shardCollection: collNS, key: {tag: 1}}));
// The meta field name on the system.buckets collection is 'meta' and hence the shardKey is 'meta'.
validateBucketsCollectionSharded({shardKey: {meta: 1}});

const mongosColl = st.s.getCollection(collNS);
assert.commandWorked(mongosColl.insert({_id: 0, tag: -1, time: ISODate(), f: 20}));
assert.commandWorked(mongosColl.insert({_id: 1, tag: -1, time: ISODate(), f: 25}));
assert.commandWorked(mongosColl.insert({_id: 2, tag: 0, time: ISODate(), f: 30}));  // Test case 1
assert.commandWorked(mongosColl.insert({_id: 3, tag: 0, time: ISODate(), f: 30}));  // Test case 1
assert.commandWorked(mongosColl.insert({_id: 4, tag: 0, time: ISODate(), f: 40}));  // Test case 2
assert.commandWorked(mongosColl.insert({_id: 5, tag: 1, time: ISODate(), f: 45}));
assert.commandWorked(mongosColl.insert({_id: 6, tag: 2, time: ISODate(), f: 50}));  // Test case 2
assert.commandWorked(mongosColl.insert({_id: 7, tag: 2, time: ISODate(), f: 50}));  // Test case 2
assert.commandWorked(mongosColl.insert({_id: 8, tag: 3, time: ISODate(), f: 60}));  // Test case 1

// Moves the chunk to the second shard leaving orphaned buckets on the first shard. The orphaned
// buckets are not deleted because the range deletion is suspended and they are buckets with tag =
// [0, 3].
//
// Note: system admin commands such as 'split' and 'moveChunk' should run on the 'system.buckets'
// collection unlike the 'shardCollection' command.
assert.commandWorked(st.s.adminCommand({split: sysCollNS, middle: {meta: 0}}));
assert.commandWorked(
    st.s.adminCommand({moveChunk: sysCollNS, find: {meta: 0}, to: st.shard1.shardName}));

// Sets up a change stream on the mongos database to receive real-time events on any data changes.
//
// Note: the change stream is on the database because watching the change stream events on the
// system.buckets collection is not allowed.
const mongosDbChangeStream = db.watch([], {showSystemEvents: true});

////////////////////////////////////////////////////////////////////////////////////////////////////
// Test case 1: Direct operations to shard on orphaned buckets
////////////////////////////////////////////////////////////////////////////////////////////////////

(function testDirectDeleteToShardOnOrphanedBuckets() {
    jsTest.log('A direct delete to a shard of an orphaned bucket does not generate a delete event');

    // Sends a direct delete to first shard on a measurement on an orphaned bucket.
    assert.commandWorked(st.shard0.getCollection(collNS).remove({f: 60}));

    // No event is generated on the database change stream.
    assertNoChanges(mongosDbChangeStream);

    // The entire orphaned bucket on the first shard has been removed with meta == 3 because the
    // measurement with {f: 60} was the only one in the bucket.
    assert.eq(null, st.shard0.getCollection(sysCollNS).findOne({meta: 3}));

    // But the measurement with {f: 60} still exists on the second shard and show up in the
    // cluster find.
    assert.eq(1, mongosColl.find({f: 60}).itcount(), mongosColl.find().toArray());
})();

(function testDirectMultiDeleteToShardOnOrphanedBuckets() {
    jsTest.log('A direct delete to a shard of multi-documents does not generate delete events');

    // Sends a direct delete to the first shard on measurements on an orphaned bucket.
    assert.commandWorked(st.shard0.getCollection(collNS).remove({f: 30}));

    // No event is generated on the database change stream.
    assertNoChanges(mongosDbChangeStream);

    // The orphaned bucket on the first shard have been updated since two measurements ({f: 30})
    // has been removed from the bucket and only the measurement with {_id: 4, f: 40} stays in
    // the bucket.
    const actualBucket = st.shard0.getCollection(sysCollNS).findOne({meta: 0});
    assert.eq(0, actualBucket.meta, actualBucket);
    assert.eq(4, actualBucket.control.min._id, actualBucket);
    assert.eq(4, actualBucket.control.max._id, actualBucket);
    assert.eq(40, actualBucket.control.min.f, actualBucket);
    assert.eq(40, actualBucket.control.max.f, actualBucket);
    assert.eq(4, actualBucket.data._id[0], actualBucket);
    assert.eq(40, actualBucket.data.f[0], actualBucket);

    // But the two measurements with {f: 30} still exists on the second shard and show up in the
    // cluster find.
    assert.eq(2, mongosColl.find({f: 30}).itcount(), mongosColl.find().toArray());
})();

////////////////////////////////////////////////////////////////////////////////////////////////////
// Test case 2: Broadcasted operations (from router) on orphaned buckets
////////////////////////////////////////////////////////////////////////////////////////////////////

(function testBroadcastedDeleteOnOrphanedBuckets() {
    jsTest.log(
        'A broadcasted delete of a single measurement generates a replace event of a bucket');

    // Sends a broadcasted delete (query on non-shardkey field) on a single measurement to all the
    // shards.
    assert.commandWorked(mongosColl.remove({_id: 4}));

    // The document is hosted by the second shard and the replace event is notified because a single
    // measurement is deleted from the containing bucket. The first shard still hosts the orphaned
    // document but no additional event must be notified.
    assert.soon(() => mongosDbChangeStream.hasNext(), 'A replace event of a bucket is expected');
    const change = mongosDbChangeStream.next();
    assert.eq(change.operationType, 'replace', change);
    assertNoChanges(mongosDbChangeStream);

    // The orphaned bucket on the first shard have not been updated, unlike the mongos collection.
    assert.eq(null, mongosColl.findOne({_id: 4}), mongosColl.find().toArray());
    const shard0Bucket = st.shard0.getCollection(sysCollNS).findOne({meta: 0});
    assert.eq(0, shard0Bucket.meta, shard0Bucket);
    assert.eq(4, shard0Bucket.control.min._id, shard0Bucket);
    assert.eq(4, shard0Bucket.control.max._id, shard0Bucket);
    assert.eq(40, shard0Bucket.control.min.f, shard0Bucket);
    assert.eq(40, shard0Bucket.control.max.f, shard0Bucket);
    assert.eq(4, shard0Bucket.data._id[0], shard0Bucket);
    assert.eq(40, shard0Bucket.data.f[0], shard0Bucket);
})();

(function testBroadcastedMultiDeleteOnOrphanedBuckets() {
    jsTest.log('A broadcasted delete of multi-documents generates a delete event of a bucket');

    // Sends a broadcasted delete (query on non-shardkey field) on two documents to all the shards.
    assert.commandWorked(mongosColl.remove({f: 50}));

    // The documents are hosted by the second shard and a bucket delete event is notified because
    // the all measurements are deleted from the bucket. The first shard still hosts the orphaned
    // documents but no additional event must be notified.
    assert.soon(() => mongosDbChangeStream.hasNext(), 'A delete event of a bucket is expected');
    const change = mongosDbChangeStream.next();
    assert.eq(change.operationType, 'delete', change);
    assertNoChanges(mongosDbChangeStream);

    // The orphaned bucket on first shard have not been removed, unlike the mongos collection.
    assert.eq(null, mongosColl.findOne({_id: 6}), mongosColl.find().toArray());
    assert.eq(null, mongosColl.findOne({_id: 7}), mongosColl.find().toArray());
    const shard0Bucket = st.shard0.getCollection(sysCollNS).findOne({meta: 2});
    assert.eq(2, shard0Bucket.meta, shard0Bucket);
    assert.eq(6, shard0Bucket.control.min._id, shard0Bucket);
    assert.eq(7, shard0Bucket.control.max._id, shard0Bucket);
    assert.eq(50, shard0Bucket.control.min.f, shard0Bucket);
    assert.eq(50, shard0Bucket.control.max.f, shard0Bucket);
    assert.eq(6, shard0Bucket.data._id[0], shard0Bucket);
    assert.eq(50, shard0Bucket.data.f[0], shard0Bucket);
})();

suspendRangeDeletionShard0.off();

st.stop();
})();