1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
|
/*
* Test that crud operations in transactions target the right shards during migration.
* @tags: [uses_transactions, uses_prepare_transaction]
*/
(function() {
'use strict';
load('jstests/libs/chunk_manipulation_util.js');
load("jstests/sharding/libs/chunk_bounds_util.js");
load("jstests/sharding/libs/find_chunks_util.js"); // for findChunksForNs
function runCommandInTxn(cmdFunc) {
let session = st.s.startSession();
session.startTransaction();
cmdFunc(session);
assert.commandWorked(session.commitTransaction_forTesting());
session.endSession();
}
let st = new ShardingTest({shards: 3});
let dbName = "test";
let collName = "user";
let ns = dbName + "." + collName;
let configDB = st.s.getDB('config');
let testDB = st.s.getDB(dbName);
// For startParallelOps to write its state.
let staticMongod = MongoRunner.runMongod({});
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
st.ensurePrimaryShard(dbName, st.shard1.shardName);
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 'hashed'}}));
let chunkDocs = findChunksUtil.findChunksByNs(configDB, ns).toArray();
let shardChunkBounds = chunkBoundsUtil.findShardChunkBounds(chunkDocs);
jsTest.log("Test 'insert'");
// Insert a doc while migrating the chunk that the doc belongs to.
let doc = {x: 0};
let hash = convertShardKeyToHashed(doc.x);
let shardBoundsPair =
chunkBoundsUtil.findShardAndChunkBoundsForShardKey(st, shardChunkBounds, {x: hash});
let fromShard = shardBoundsPair.shard;
let toShard = st.getOther(fromShard);
runCommandDuringTransferMods(st.s,
staticMongod,
ns,
null /* findCriteria */,
shardBoundsPair.bounds,
fromShard,
toShard,
() => {
runCommandInTxn((session) => {
let sessionColl =
session.getDatabase(dbName).getCollection(collName);
assert.commandWorked(sessionColl.insert(doc));
});
});
// Check that the inserted doc is on the recipient shard.
assert.eq(1, testDB.user.find(doc).count());
assert.eq(1, toShard.getCollection(ns).find(doc).count());
// Clean up.
assert.commandWorked(testDB.user.remove({}));
chunkDocs = findChunksUtil.findChunksByNs(configDB, ns).toArray();
shardChunkBounds = chunkBoundsUtil.findShardChunkBounds(chunkDocs);
// Insert docs that are expected to go to three different shards, check that the docs
// are on the right shards and store the shard and chunk bounds for each doc.
let docs = [{x: -10}, {x: -1}, {x: 10}];
assert.commandWorked(testDB.user.insert(docs));
let shards = [];
let docChunkBounds = [];
for (let doc of docs) {
let hash = convertShardKeyToHashed(doc.x);
let shardBoundsPair =
chunkBoundsUtil.findShardAndChunkBoundsForShardKey(st, shardChunkBounds, {x: hash});
assert.eq(1, shardBoundsPair.shard.getCollection(ns).find(doc).count());
shards.push(shardBoundsPair.shard);
docChunkBounds.push(shardBoundsPair.bounds);
}
assert.eq(3, (new Set(shards)).size);
assert.eq(3, testDB.user.find({}).count());
// Perform a series of operations on docs[1] while moving the chunk that it belongs to
// from shards[1] to shards[2], then to shards[0] and back to shards[1].
jsTest.log("Test 'update'");
// Update the doc while migrating the chunk.
fromShard = shards[1];
toShard = shards[2];
runCommandDuringTransferMods(
st.s, staticMongod, ns, null /* findCriteria */, docChunkBounds[1], fromShard, toShard, () => {
runCommandInTxn((session) => {
let sessionColl = session.getDatabase(dbName).getCollection(collName);
assert.commandWorked(
sessionColl.update({x: -1}, {$set: {updated: true}}, {multi: true}));
});
});
// Check that the doc is updated correctly.
assert.eq(1, testDB.user.find({x: -1, updated: true}).count());
assert.eq(0, shards[0].getCollection(ns).find({updated: true}).count());
assert.eq(0, shards[1].getCollection(ns).find({updated: true}).count());
assert.eq(1, shards[2].getCollection(ns).find({updated: true}).count());
jsTest.log("Test 'findAndModify'");
// findAndModify the doc while migrating the chunk.
fromShard = shards[2];
toShard = shards[0];
runCommandDuringTransferMods(
st.s, staticMongod, ns, null /* findCriteria */, docChunkBounds[1], fromShard, toShard, () => {
runCommandInTxn((session) => {
let sessionDB = session.getDatabase(dbName);
assert.commandWorked(sessionDB.runCommand(
{findAndModify: collName, query: {x: -1}, update: {$set: {y: 1}}}));
});
});
// Check that the doc is updated correctly.
assert.eq(1, testDB.user.find({x: -1, y: 1}).count());
assert.eq(1, shards[0].getCollection(ns).count({y: 1}));
assert.eq(0, shards[1].getCollection(ns).count({y: 1}));
assert.eq(0, shards[2].getCollection(ns).count({y: 1}));
jsTest.log("Test 'remove'");
// Remove the doc while migrating the chunk.
fromShard = shards[0];
toShard = shards[1];
runCommandDuringTransferMods(
st.s, staticMongod, ns, null /* findCriteria */, docChunkBounds[1], fromShard, toShard, () => {
runCommandInTxn((session) => {
let sessionColl = session.getDatabase(dbName).getCollection(collName);
assert.commandWorked(sessionColl.remove({x: -1}));
});
});
// Check that the doc is removed correctly.
assert.eq(2, testDB.user.find({}).count());
assert.eq(1, shards[0].getCollection(ns).find({}).count());
assert.eq(0, shards[1].getCollection(ns).find({}).count());
assert.eq(1, shards[2].getCollection(ns).find({}).count());
st.stop();
MongoRunner.stopMongod(staticMongod);
})();
|