summaryrefslogtreecommitdiff
path: root/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js
blob: 5272629c71085b5484bec996f01032624c2c77c5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
'use strict';

/**
 * Provides an init state that partitions the data space into chunks evenly across threads.
 *
 *      t1's data partition encapsulated in own chunk
 *       v
 *   ------------) | [------------) | [------------  < t3's data partition in own chunk
 *                      ^
 *                     t2's data partition encapsulated in own chunk
 *
 * Intended to allow mergeChunks, moveChunk, and splitChunk operations to stay
 * within the bounds of a thread's partition.
 *
 *   <==t1's partition==>                           <==t3's partition==>
 *
 *   ---)[--)[----)[---) | [---)[---)[----)[-)[) | [-------)[-)[--------
 *
 *                         <===t2's partition==>
 *
 * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off]
 */

load('jstests/concurrency/fsm_workload_helpers/chunks.js');  // for chunk helpers

var $config = (function() {
    var data = {
        partitionSize: 1,
        // We use a non-hashed shard key of { _id: 1 } so that documents reside on their expected
        // shard. The setup function creates documents with sequential numbering and gives
        // each shard its own numeric range to work with.
        shardKey: {_id: 1},
        shardKeyField: '_id',
    };

    data.makePartition = function makePartition(ns, tid, partitionSize) {
        var partition = {ns: ns};
        partition.lower = tid * partitionSize;
        partition.upper = (tid * partitionSize) + partitionSize;

        partition.isLowChunk = (tid === 0) ? true : false;
        partition.isHighChunk = (tid === (this.threadCount - 1)) ? true : false;

        partition.chunkLower = partition.isLowChunk ? MinKey : partition.lower;
        partition.chunkUpper = partition.isHighChunk ? MaxKey : partition.upper;

        // Unless only 1 thread, verify that we aren't both the high and low chunk.
        if (this.threadCount > 1) {
            assertAlways(!(partition.isLowChunk && partition.isHighChunk),
                         'should not be both the high and low chunk when there is more than 1 ' +
                             'thread:\n' + tojson(this));
        } else {
            assertAlways(partition.isLowChunk && partition.isHighChunk,
                         'should be both the high and low chunk when there is only 1 thread:\n' +
                             tojson(this));
        }

        return partition;
    };

    // Intended for use on config servers only.
    // Get a random chunk within this thread's partition.
    data.getRandomChunkInPartition = function getRandomChunkInPartition(conn) {
        assert(isMongodConfigsvr(conn.getDB('admin')), 'Not connected to a mongod configsvr');
        assert(this.partition,
               'This function must be called from workloads that partition data across threads.');
        var coll = conn.getDB('config').chunks;
        // We must split up these cases because MinKey and MaxKey are not fully comparable.
        // This may be due to SERVER-18341, where the Matcher returns false positives in
        // comparison predicates with MinKey/MaxKey.
        const maxField = 'max.' + this.shardKeyField;
        const minField = 'min.' + this.shardKeyField;
        if (this.partition.isLowChunk && this.partition.isHighChunk) {
            return coll
                .aggregate([
                    {$match: {ns: this.partition.ns}},
                    {$sample: {size: 1}},
                ])
                .toArray()[0];
        } else if (this.partition.isLowChunk) {
            return coll
                .aggregate([
                    {
                        $match:
                            {ns: this.partition.ns, [maxField]: {$lte: this.partition.chunkUpper}}
                    },
                    {$sample: {size: 1}}
                ])
                .toArray()[0];
        } else if (this.partition.isHighChunk) {
            return coll
                .aggregate([
                    {
                        $match:
                            {ns: this.partition.ns, [minField]: {$gte: this.partition.chunkLower}}
                    },
                    {$sample: {size: 1}}
                ])
                .toArray()[0];
        } else {
            return coll
                .aggregate([
                    {
                        $match: {
                            ns: this.partition.ns,
                            [minField]: {$gte: this.partition.chunkLower},
                            [maxField]: {$lte: this.partition.chunkUpper}
                        }
                    },
                    {$sample: {size: 1}}
                ])
                .toArray()[0];
        }
    };

    // This is used by the extended workloads to perform additional setup for more splitPoints.
    data.setupAdditionalSplitPoints = function setupAdditionalSplitPoints(
        db, collName, partition) {};

    var states = (function() {
        // Inform this thread about its partition,
        // and verify that its partition is encapsulated in a single chunk.
        function init(db, collName, connCache) {
            var ns = db[collName].getFullName();

            // Inform this thread about its partition.
            // The tid of each thread is assumed to be in the range [0, this.threadCount).
            this.partition = this.makePartition(ns, this.tid, this.partitionSize);
            Object.freeze(this.partition);

            // Verify that there is exactly 1 chunk in our partition.
            var config = ChunkHelper.getPrimary(connCache.config);
            var numChunks = ChunkHelper.getNumChunks(
                config, ns, this.partition.chunkLower, this.partition.chunkUpper);
            var chunks = ChunkHelper.getChunks(config, ns, MinKey, MaxKey);
            var msg = tojson({tid: this.tid, data: this.data, chunks: chunks});
            assertWhenOwnColl.eq(numChunks, 1, msg);
        }

        function dummy(db, collName, connCache) {
        }

        return {init: init, dummy: dummy};
    })();

    var transitions = {init: {dummy: 1}, dummy: {dummy: 1}};

    // Define each thread's data partition, populate it, and encapsulate it in a chunk.
    var setup = function setup(db, collName, cluster) {
        var dbName = db.getName();
        var ns = db[collName].getFullName();
        var configDB = db.getSiblingDB('config');

        // Sharding must be enabled on db.
        var res = configDB.databases.findOne({_id: dbName});
        var msg = 'db ' + dbName + ' must be sharded.';
        assertAlways(res.partitioned, msg);

        // Sharding must be enabled on db[collName].
        msg = 'collection ' + collName + ' must be sharded.';
        assertAlways.gte(configDB.chunks.find({ns: ns}).itcount(), 1, msg);

        for (var tid = 0; tid < this.threadCount; ++tid) {
            // Define this thread's partition.
            // The tid of each thread is assumed to be in the range [0, this.threadCount).
            var partition = this.makePartition(ns, tid, this.partitionSize);

            // Populate this thread's partition.
            var bulk = db[collName].initializeUnorderedBulkOp();
            for (var i = partition.lower; i < partition.upper; ++i) {
                bulk.insert({_id: i});
            }
            assertAlways.commandWorked(bulk.execute());

            // Add split point for lower end of this thread's partition.
            // Since a split point will be created at the low end of each partition,
            // in the end each partition will be encompassed in its own chunk.
            // It's unnecessary to add a split point for the lower end for the thread
            // that has the lowest partition, because its chunk's lower end should be MinKey.
            if (!partition.isLowChunk) {
                assertWhenOwnColl.commandWorked(
                    ChunkHelper.splitChunkAtPoint(db, collName, partition.lower));
            }

            this.setupAdditionalSplitPoints(db, collName, partition);
        }
    };

    return {
        threadCount: 1,
        iterations: 1,
        startState: 'init',
        states: states,
        transitions: transitions,
        data: data,
        setup: setup,
        passConnectionCache: true
    };
})();