diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2022-05-19 13:53:19 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-19 15:15:57 +0000 |
commit | 9e4f58aa6bb73cbcd7e3f9f5a6dbcbc5c88b8254 (patch) | |
tree | 017b7540bcf46f41c28c9c973c531b8744d2a913 | |
parent | 0419b6d14796177749c1921fb787a8c1d0e1faa3 (diff) | |
download | mongo-9e4f58aa6bb73cbcd7e3f9f5a6dbcbc5c88b8254.tar.gz |
SERVER-65816 Change balancer policy to balance on data size rather than number of chunks
31 files changed, 922 insertions, 184 deletions
diff --git a/jstests/concurrency/fsm_workloads/insert_with_data_size_aware_balancing.js b/jstests/concurrency/fsm_workloads/insert_with_data_size_aware_balancing.js new file mode 100644 index 00000000000..133991d73a6 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/insert_with_data_size_aware_balancing.js @@ -0,0 +1,178 @@ +'use strict'; + +/** + * - Shard several collections with different (random) configured maxChunkSize + * - Perform continuous inserts of random amounts of data into the collections + * - Verify that the balancer fairly redistributes data among available shards + * + * @tags: [ + * requires_sharding, + * assumes_balancer_on, + * featureFlagBalanceAccordingToDataSize, + * does_not_support_stepdowns, + * requires_fcv_61, + * ] + */ + +const bigString = 'X'.repeat(1024 * 1024 - 30); // Almost 1MB, to create documents of exactly 1MB +const minChunkSizeMB = 1; +const maxChunkSizeMB = 10; +const dbNames = ['db0', 'db1']; +const collNames = ['collA', 'collB', 'collC']; + +/* + * Get a random db/coll name from the test lists. + * + * Using the thread id to introduce more randomness: it has been observed that concurrent calls to + * Random.randInt(array.length) are returning too often the same number to different threads. + */ +function getRandomDbName(tid) { + return dbNames[Random.randInt(tid * tid) % dbNames.length]; +} +function getRandomCollName(tid) { + return collNames[Random.randInt(tid * tid) % collNames.length]; +} + +var $config = (function() { + let states = { + /* + * Insert into a test collection a random amount of documents (up to 10MB per iteration) + */ + insert: function(db, collName, connCache) { + const dbName = getRandomDbName(this.tid); + db = db.getSiblingDB(dbName); + collName = getRandomCollName(this.tid); + const coll = db[collName]; + + const numDocs = Random.randInt(maxChunkSizeMB - 1) + 1; + let insertBulkOp = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < numDocs; ++i) { + insertBulkOp.insert({s: bigString}); + } + + assertAlways.commandWorked(insertBulkOp.execute()); + }, + }; + + /* + * Create sharded collections with random maxChunkSizeMB (betwen 1MB and 10MB) + */ + let setup = function(db, collName, cluster) { + const mongos = cluster.getDB('config').getMongo(); + const shardNames = Object.keys(cluster.getSerializedCluster().shards); + const numShards = shardNames.length; + + for (let i = 0; i < dbNames.length; i++) { + // Initialize database + const dbName = dbNames[i]; + const newDb = db.getSiblingDB(dbName); + newDb.adminCommand({enablesharding: dbName, primaryShard: shardNames[i % numShards]}); + + for (let j = 0; j < collNames.length; j++) { + // Shard collection + collName = collNames[j]; + const coll = newDb[collName]; + const ns = coll.getFullName(); + db.adminCommand({shardCollection: ns, key: {_id: 1}}); + + // Configure random maxChunkSize + const randomMaxChunkSizeMB = Random.randInt(maxChunkSizeMB - 1) + 1; + assert.commandWorked(mongos.adminCommand({ + configureCollectionBalancing: ns, + chunkSize: randomMaxChunkSizeMB, + })); + } + } + }; + + /* + * Verify that the balancer fairly redistributes data among available shards: the + * collection size difference between two shards must be at most 2 * maxChunkSize + */ + let teardown = function(db, collName, cluster) { + const mongos = cluster.getDB('config').getMongo(); + // Sentinel variable to make sure not all collections have been skipped + let testedAtLeastOneCollection = false; + for (let i = 0; i < dbNames.length; i++) { + const dbName = dbNames[i]; + for (let j = 0; j < collNames.length; j++) { + collName = collNames[j]; + const ns = dbName + '.' + collName; + + const coll = mongos.getCollection(ns); + if (coll.countDocuments({}) === 0) { + // Skip empty collections + continue; + } + testedAtLeastOneCollection = true; + + // Wait for collection to be considered balanced + assert.soon( + function() { + return assert + .commandWorked(mongos.adminCommand({balancerCollectionStatus: ns})) + .balancerCompliant; + }, + 'Timed out waiting for collections to be balanced', + 60000 * 5 /* timeout (5 minutes) */, + 1000 /* interval */); + + const statsPipeline = [ + {'$collStats': {'storageStats': {}}}, + { + '$project': { + 'shard': true, + 'storageStats': { + 'count': true, + 'size': true, + 'avgObjSize': true, + 'numOrphanDocs': true + } + } + } + ]; + + // Get stats for the collection from each shard + const storageStats = coll.aggregate(statsPipeline).toArray(); + let minSizeOnShardForCollection = Number.MAX_VALUE; + let maxSizeOnShardForCollection = Number.MIN_VALUE; + + storageStats.forEach(function(shardStats) { + const orphansSize = shardStats['storageStats']['numOrphanDocs'] * + shardStats['storageStats']['avgObjSize']; + const size = shardStats['storageStats']['size'] - orphansSize; + if (size > maxSizeOnShardForCollection) { + maxSizeOnShardForCollection = size; + } + if (size < minSizeOnShardForCollection) { + minSizeOnShardForCollection = size; + } + }); + + // Check that there is no imbalance + const collEntry = cluster.getDB('config').collections.findOne({'_id': ns}); + const errMsg = "ns=" + ns + ' , collEntry=' + JSON.stringify(collEntry) + + ', storageStats=' + JSON.stringify(storageStats); + assert.lte(maxSizeOnShardForCollection - minSizeOnShardForCollection, + 2 * collEntry.maxChunkSizeBytes, + errMsg); + } + + assert(testedAtLeastOneCollection); + } + }; + + let transitions = {insert: {insert: 1.0}}; + + return { + threadCount: 5, + iterations: 8, + startState: 'insert', + states: states, + transitions: transitions, + data: {}, + setup: setup, + teardown: teardown, + passConnectionCache: true + }; +})(); diff --git a/jstests/noPassthroughWithMongod/no_balance_collection.js b/jstests/noPassthroughWithMongod/no_balance_collection.js index 0106f2c6e9e..e4f2aaf5ce6 100644 --- a/jstests/noPassthroughWithMongod/no_balance_collection.js +++ b/jstests/noPassthroughWithMongod/no_balance_collection.js @@ -1,10 +1,22 @@ // Tests whether the noBalance flag disables balancing for collections // @tags: [requires_sharding] +(function() { +"use strict"; + load("jstests/sharding/libs/find_chunks_util.js"); +load("jstests/libs/feature_flag_util.js"); var st = new ShardingTest({shards: 2, mongos: 1}); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} + // First, test that shell helpers require an argument assert.throws(sh.disableBalancing, [], "sh.disableBalancing requires a collection"); assert.throws(sh.enableBalancing, [], "sh.enableBalancing requires a collection"); @@ -109,3 +121,4 @@ if (lastMigration == null) { } st.stop(); +}()); diff --git a/jstests/sharding/auth.js b/jstests/sharding/auth.js index 48351d0a59b..dcb65c2f2a7 100644 --- a/jstests/sharding/auth.js +++ b/jstests/sharding/auth.js @@ -10,6 +10,7 @@ 'use strict'; load("jstests/replsets/rslib.js"); load("jstests/sharding/libs/find_chunks_util.js"); +load("jstests/libs/feature_flag_util.js"); // Replica set nodes started with --shardsvr do not enable key generation until they are added // to a sharded cluster and reject commands with gossiped clusterTime from users without the @@ -184,15 +185,21 @@ assert.commandWorked(bulk.execute()); s.startBalancer(60000); -assert.soon(function() { - var d1Chunks = findChunksUtil.countChunksForNs(s.getDB("config"), 'test.foo', {shard: "d1"}); - var d2Chunks = findChunksUtil.countChunksForNs(s.getDB("config"), 'test.foo', {shard: "d2"}); - var totalChunks = findChunksUtil.countChunksForNs(s.getDB("config"), 'test.foo'); +// TODO SERVER-66378 adapt this test for data size aware balancing +const balanceAccordingToDataSize = TestData.setParameters.featureFlagBalanceAccordingToDataSize; +if (!balanceAccordingToDataSize) { + assert.soon(function() { + var d1Chunks = + findChunksUtil.countChunksForNs(s.getDB("config"), 'test.foo', {shard: "d1"}); + var d2Chunks = + findChunksUtil.countChunksForNs(s.getDB("config"), 'test.foo', {shard: "d2"}); + var totalChunks = findChunksUtil.countChunksForNs(s.getDB("config"), 'test.foo'); - print("chunks: " + d1Chunks + " " + d2Chunks + " " + totalChunks); + print("chunks: " + d1Chunks + " " + d2Chunks + " " + totalChunks); - return d1Chunks > 0 && d2Chunks > 0 && (d1Chunks + d2Chunks == totalChunks); -}, "Chunks failed to balance", 60000, 5000); + return d1Chunks > 0 && d2Chunks > 0 && (d1Chunks + d2Chunks == totalChunks); + }, "Chunks failed to balance", 60000, 5000); +} // SERVER-33753: count() without predicate can be wrong on sharded collections. // assert.eq(s.getDB("test").foo.count(), num+1); diff --git a/jstests/sharding/authCommands.js b/jstests/sharding/authCommands.js index 7d78366b99e..d3cfd168276 100644 --- a/jstests/sharding/authCommands.js +++ b/jstests/sharding/authCommands.js @@ -4,6 +4,8 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); + // Multiple users cannot be authenticated on one connection within a session. TestData.disableImplicitSessions = true; @@ -17,6 +19,14 @@ load("jstests/sharding/libs/find_chunks_util.js"); // gossip that time later in setup. // +// TODO SERVER-66378 adapt this test for data size aware balancing +const dataSizeAwareBalancingFeatureFlag = + TestData.setParameters.featureFlagBalanceAccordingToDataSize; +if (dataSizeAwareBalancingFeatureFlag) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + return; +} + var st = new ShardingTest({ shards: 2, rs: {oplogSize: 10, useHostname: false}, diff --git a/jstests/sharding/auto_rebalance_parallel.js b/jstests/sharding/auto_rebalance_parallel.js index 1e55e8cbc11..d36d8adac57 100644 --- a/jstests/sharding/auto_rebalance_parallel.js +++ b/jstests/sharding/auto_rebalance_parallel.js @@ -5,9 +5,18 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); load("jstests/sharding/libs/find_chunks_util.js"); var st = new ShardingTest({shards: 4}); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} + var config = st.s0.getDB('config'); assert.commandWorked(st.s0.adminCommand({enableSharding: 'TestDB'})); diff --git a/jstests/sharding/balancer_collection_status.js b/jstests/sharding/balancer_collection_status.js index 25622820605..4d47f1a2866 100644 --- a/jstests/sharding/balancer_collection_status.js +++ b/jstests/sharding/balancer_collection_status.js @@ -5,8 +5,9 @@ (function() { 'use strict'; -const chunkSizeMB = 1; +load("jstests/libs/feature_flag_util.js"); +const chunkSizeMB = 1; let st = new ShardingTest({ shards: 3, other: { @@ -15,6 +16,14 @@ let st = new ShardingTest({ } }); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} + function runBalancer(rounds) { st.startBalancer(); let numRounds = 0; diff --git a/jstests/sharding/balancer_window.js b/jstests/sharding/balancer_window.js index ee48db64844..a52e9eeee70 100644 --- a/jstests/sharding/balancer_window.js +++ b/jstests/sharding/balancer_window.js @@ -13,6 +13,7 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); load("jstests/sharding/libs/find_chunks_util.js"); /** @@ -46,6 +47,13 @@ var HourAndMinute = function(hour, minutes) { }; var st = new ShardingTest({shards: 2}); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} var configDB = st.s.getDB('config'); assert.commandWorked(configDB.adminCommand({enableSharding: 'test'})); assert.commandWorked(configDB.adminCommand({shardCollection: 'test.user', key: {_id: 1}})); diff --git a/jstests/sharding/balancing_based_on_size.js b/jstests/sharding/balancing_based_on_size.js new file mode 100644 index 00000000000..017deb17ad4 --- /dev/null +++ b/jstests/sharding/balancing_based_on_size.js @@ -0,0 +1,122 @@ +/* + * Test that the balancer is redistributing data based on the actual amount of data + * for a collection on each node, converging when the size difference becomes small. + * + * @tags: [ + * featureFlagBalanceAccordingToDataSize, + * requires_fcv_61, + * ] + */ + +(function() { +'use strict'; + +load("jstests/sharding/libs/find_chunks_util.js"); + +function getCollSizeMB(ns, node) { + let res; + let collections = [{ns: ns}]; + assert.soon(() => { + res = assert.commandWorkedOrFailedWithCode( + node.adminCommand({_shardsvrGetStatsForBalancing: 1, collections: collections}), + [ErrorCodes.NotYetInitialized]); + return res.ok; + }); + + return res['stats'][0]['collSize']; +} + +const maxChunkSizeMB = 1; +const st = new ShardingTest( + {shards: 2, mongos: 1, other: {chunkSize: maxChunkSizeMB, enableBalancer: false}}); +const dbName = 'test'; +const coll = st.getDB(dbName).getCollection('foo'); +const ns = coll.getFullName(); +const mongos = st.s; +const shard0 = st.shard0.shardName; +const shard1 = st.shard1.shardName; + +// Shard collection with one chunk on shard0 [MinKey, 0) and one chunk on shard1 [0, MinKey) +assert.commandWorked(mongos.adminCommand({enablesharding: dbName, primaryShard: shard0})); +assert.commandWorked(mongos.adminCommand({shardcollection: ns, key: {_id: 1}})); +assert.commandWorked(mongos.adminCommand({split: ns, middle: {_id: 0}})); +assert.commandWorked(mongos.adminCommand({moveChunk: ns, find: {_id: 0}, to: shard1})); + +const bigString = 'X'.repeat(1024 * 1024); // 1MB + +// Insert 10MB of documents in range [MinKey, 0) on shard0 +var bulk = coll.initializeUnorderedBulkOp(); +for (var i = -1; i > -11; i--) { + bulk.insert({_id: i, s: bigString}); +} +assert.commandWorked(bulk.execute()); + +// Insert 3MB of documents in range [0, MaxKey) on shard1 +bulk = coll.initializeUnorderedBulkOp(); +for (var i = 0; i < 3; i++) { + bulk.insert({_id: i, s: bigString}); +} +assert.commandWorked(bulk.execute()); + +// Create 3 more chunks on shard0 +assert.commandWorked(mongos.adminCommand({split: ns, middle: {_id: 1}})); +assert.commandWorked(mongos.adminCommand({split: ns, middle: {_id: 2}})); +assert.commandWorked(mongos.adminCommand({split: ns, middle: {_id: 3}})); + +// At this point, the distribution of chunks for the testing collection is the following: +// - On shard0 (10MB): +// { "_id" : { "$minKey" : 1 } } -->> { "_id" : 0 } +// - On shard1 (3MB): +// { "_id" : 0 } -->> { "_id" : 1 } +// { "_id" : 1 } -->> { "_id" : 2 } +// { "_id" : 2 } -->> { "_id" : 3 } +// { "_id" : 3 } -->> { "_id" : { "$maxKey" : 1 } } +jsTestLog("Printing sharding status before starting balancer"); +st.printShardingStatus(); +st.startBalancer(); + +assert.soon(function() { + return assert.commandWorked(st.s0.adminCommand({balancerCollectionStatus: ns})) + .balancerCompliant; +}, 'Timed out waiting for the collection to be balanced', 60000 /* timeout */, 1000 /* interval */); + +// Check that the collection size diff between shards is small (2 * maxChunkSize) +const collSizeOnShard0BeforeNoopRounds = getCollSizeMB(ns, st.shard0.rs.getPrimary()); +const collSizeOnShard1BeforeNoopRounds = getCollSizeMB(ns, st.shard1.rs.getPrimary()); +const chunksBeforeNoopRound = findChunksUtil.findChunksByNs(st.config, ns).toArray(); +var errMsg = '[Before noop round] Data on shard0 = ' + collSizeOnShard0BeforeNoopRounds + + ' and data on shard 1 = ' + collSizeOnShard1BeforeNoopRounds + + ' - chunks before noop round = ' + JSON.stringify(chunksBeforeNoopRound); +assert.lte(collSizeOnShard0BeforeNoopRounds - collSizeOnShard1BeforeNoopRounds, + 2 * maxChunkSizeMB, + errMsg); + +// Wait for some more rounds and then check the balancer is not wrongly moving around data +st.forEachConfigServer((conn) => { + conn.adminCommand({ + configureFailPoint: 'overrideBalanceRoundInterval', + mode: 'alwaysOn', + data: {intervalMs: 100} + }); +}); + +st.awaitBalancerRound(); +st.awaitBalancerRound(); +st.awaitBalancerRound(); + +st.stopBalancer(); +jsTestLog("Printing sharding status after stopping balancer"); +st.printShardingStatus(); + +const collSizeOnShard0AfterNoopRounds = getCollSizeMB(ns, st.shard0.rs.getPrimary()); +const collSizeOnShard1AfterNoopRounds = getCollSizeMB(ns, st.shard1.rs.getPrimary()); +const chunksAfterNoopRound = findChunksUtil.findChunksByNs(st.config, ns).toArray(); +errMsg = '[AFTER NOOP ROUND] Data on shard0 = ' + collSizeOnShard0AfterNoopRounds + + ' and data on shard 1 = ' + collSizeOnShard1AfterNoopRounds + + ' - chunks before noop round = ' + JSON.stringify(chunksAfterNoopRound); +assert.eq(collSizeOnShard0BeforeNoopRounds, collSizeOnShard0AfterNoopRounds, errMsg); +assert.eq(collSizeOnShard1BeforeNoopRounds, collSizeOnShard1AfterNoopRounds, errMsg); +assert.eq(chunksBeforeNoopRound, chunksAfterNoopRound); + +st.stop(); +})(); diff --git a/jstests/sharding/balancing_sessions_collection.js b/jstests/sharding/balancing_sessions_collection.js index 231f633dead..960b19693ee 100644 --- a/jstests/sharding/balancing_sessions_collection.js +++ b/jstests/sharding/balancing_sessions_collection.js @@ -6,6 +6,7 @@ (function() { "use strict"; +load("jstests/libs/feature_flag_util.js"); load("jstests/sharding/libs/find_chunks_util.js"); // TODO SERVER-50144 Remove this and allow orphan checking. @@ -112,6 +113,13 @@ const st = new ShardingTest({ shards: numShards, other: {configOptions: {setParameter: {minNumChunksForSessionsCollection: kMinNumChunks}}} }); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} const kSessionsNs = "config.system.sessions"; const configDB = st.s.getDB("config"); diff --git a/jstests/sharding/enforce_zone_policy.js b/jstests/sharding/enforce_zone_policy.js index 11a43d2572d..30413599a07 100644 --- a/jstests/sharding/enforce_zone_policy.js +++ b/jstests/sharding/enforce_zone_policy.js @@ -3,9 +3,17 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); load("jstests/sharding/libs/find_chunks_util.js"); var st = new ShardingTest({shards: 3, mongos: 1}); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} assert.commandWorked(st.s0.adminCommand({enablesharding: 'test'})); st.ensurePrimaryShard('test', st.shard1.shardName); diff --git a/jstests/sharding/migrateBig.js b/jstests/sharding/migrateBig.js index 60306797dfd..0282e5afe63 100644 --- a/jstests/sharding/migrateBig.js +++ b/jstests/sharding/migrateBig.js @@ -1,7 +1,16 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); + var s = new ShardingTest({name: "migrateBig", shards: 2, other: {chunkSize: 1}}); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(s.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + s.stop(); + return; +} assert.commandWorked( s.config.settings.update({_id: "balancer"}, {$set: {_waitForDelete: true}}, true)); diff --git a/jstests/sharding/move_chunk_allowMigrations.js b/jstests/sharding/move_chunk_allowMigrations.js index 6395a7a7424..93eb2b937e2 100644 --- a/jstests/sharding/move_chunk_allowMigrations.js +++ b/jstests/sharding/move_chunk_allowMigrations.js @@ -11,6 +11,7 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); load('jstests/libs/fail_point_util.js'); load('jstests/libs/parallel_shell_helpers.js'); load("jstests/sharding/libs/find_chunks_util.js"); @@ -18,6 +19,13 @@ load("jstests/sharding/libs/shard_versioning_util.js"); const st = new ShardingTest({shards: 2}); const configDB = st.s.getDB("config"); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} // Resets database dbName and enables sharding and establishes shard0 as primary, test case agnostic function setUpDatabaseAndEnableSharding(dbName) { diff --git a/jstests/sharding/move_chunk_permitMigrations.js b/jstests/sharding/move_chunk_permitMigrations.js index e984a9fbdfb..f269eb7757c 100644 --- a/jstests/sharding/move_chunk_permitMigrations.js +++ b/jstests/sharding/move_chunk_permitMigrations.js @@ -10,12 +10,20 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); load('jstests/libs/fail_point_util.js'); load('jstests/libs/parallel_shell_helpers.js'); load("jstests/sharding/libs/find_chunks_util.js"); load("jstests/sharding/libs/shard_versioning_util.js"); const st = new ShardingTest({shards: 2}); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} const configDB = st.s.getDB("config"); const dbName = 'AllowMigrations'; diff --git a/jstests/sharding/zone_changes_hashed.js b/jstests/sharding/zone_changes_hashed.js index 83265fac92f..6bc975ea783 100644 --- a/jstests/sharding/zone_changes_hashed.js +++ b/jstests/sharding/zone_changes_hashed.js @@ -4,6 +4,7 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); load("jstests/sharding/libs/zone_changes_util.js"); load("jstests/sharding/libs/find_chunks_util.js"); @@ -48,6 +49,13 @@ function findHighestChunkBounds(chunkBounds) { } let st = new ShardingTest({shards: 3}); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} let primaryShard = st.shard0; let dbName = "test"; let testDB = st.s.getDB(dbName); diff --git a/jstests/sharding/zone_changes_range.js b/jstests/sharding/zone_changes_range.js index 2f2963da220..751c5984568 100644 --- a/jstests/sharding/zone_changes_range.js +++ b/jstests/sharding/zone_changes_range.js @@ -4,10 +4,18 @@ (function() { 'use strict'; +load("jstests/libs/feature_flag_util.js"); load("jstests/sharding/libs/zone_changes_util.js"); load("jstests/sharding/libs/find_chunks_util.js"); let st = new ShardingTest({shards: 3}); +// TODO SERVER-66378 adapt this test for data size aware balancing +if (FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), + "BalanceAccordingToDataSize")) { + jsTestLog("Skipping as featureFlagBalanceAccordingToDataSize is enabled"); + st.stop(); + return; +} let primaryShard = st.shard0; let dbName = "test"; let testDB = st.s.getDB(dbName); diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 3a28317fe61..63046d18b7a 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -381,7 +381,6 @@ env.Library( 'flush_routing_table_cache_updates_command.cpp', 'get_database_version_command.cpp', 'get_shard_version_command.cpp', - 'get_stats_for_balancing.idl', 'migration_chunk_cloner_source_legacy_commands.cpp', 'migration_destination_manager_legacy_commands.cpp', 'move_primary_coordinator_document.idl', diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 37ebe25150c..eaf28d3eaf1 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -408,7 +408,6 @@ Status Balancer::moveRange(OperationContext* opCtx, return std::tuple<ShardId, BSONObj>{chunk.getShardId(), chunk.getMin()}; }(); - if (fromShardId == request.getToShard()) { return Status::OK(); } @@ -984,10 +983,15 @@ int Balancer::_moveChunks(OperationContext* opCtx, std::vector<std::pair<const MigrateInfo&, SemiFuture<void>>> rebalanceMigrationsAndResponses, defragmentationMigrationsAndResponses; auto requestMigration = [&](const MigrateInfo& migrateInfo) -> SemiFuture<void> { - auto coll = Grid::get(opCtx)->catalogClient()->getCollection( - opCtx, migrateInfo.nss, repl::ReadConcernLevel::kMajorityReadConcern); - auto maxChunkSizeBytes = - coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes()); + auto maxChunkSizeBytes = [&]() { + if (migrateInfo.optMaxChunkSizeBytes.has_value()) { + return *migrateInfo.optMaxChunkSizeBytes; + } + + auto coll = Grid::get(opCtx)->catalogClient()->getCollection( + opCtx, migrateInfo.nss, repl::ReadConcernLevel::kMajorityReadConcern); + return coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes()); + }(); if (serverGlobalParams.featureCompatibility.isLessThan( multiversion::FeatureCompatibilityVersion::kVersion_6_0)) { @@ -1001,18 +1005,14 @@ int Balancer::_moveChunks(OperationContext* opCtx, MoveRangeRequestBase requestBase(migrateInfo.to); requestBase.setWaitForDelete(balancerConfig->waitForDelete()); requestBase.setMin(migrateInfo.minKey); - if (!feature_flags::gNoMoreAutoSplitter.isEnabled( - serverGlobalParams.featureCompatibility)) { - // Issue the equivalent of a `moveChunk` if the auto-splitter is enabled - requestBase.setMax(migrateInfo.maxKey); - } + requestBase.setMax(migrateInfo.maxKey); ShardsvrMoveRange shardSvrRequest(migrateInfo.nss); shardSvrRequest.setDbName(NamespaceString::kAdminDb); shardSvrRequest.setMoveRangeRequestBase(requestBase); shardSvrRequest.setMaxChunkSizeBytes(maxChunkSizeBytes); shardSvrRequest.setFromShard(migrateInfo.from); - shardSvrRequest.setEpoch(coll.getEpoch()); + shardSvrRequest.setEpoch(migrateInfo.version.epoch()); const auto [secondaryThrottle, wc] = getSecondaryThrottleAndWriteConcern(balancerConfig->getSecondaryThrottle()); shardSvrRequest.setSecondaryThrottle(secondaryThrottle); diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp index 8a361cbf912..410be70aff1 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -39,6 +39,7 @@ #include "mongo/base/status_with.h" #include "mongo/bson/bsonobj_comparator_interface.h" #include "mongo/db/s/sharding_config_server_parameters_gen.h" +#include "mongo/db/s/sharding_util.h" #include "mongo/logv2/log.h" #include "mongo/platform/bits.h" #include "mongo/s/balancer_configuration.h" @@ -47,6 +48,8 @@ #include "mongo/s/catalog/type_tags.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/get_stats_for_balancing_gen.h" +#include "mongo/s/sharding_feature_flags_gen.h" #include "mongo/util/str.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding @@ -100,6 +103,84 @@ StatusWith<DistributionStatus> createCollectionDistributionStatus( return {std::move(distribution)}; } +stdx::unordered_map<NamespaceString, CollectionDataSizeInfoForBalancing> +getDataSizeInfoForCollections(OperationContext* opCtx, + const std::vector<CollectionType>& collections) { + const auto balancerConfig = Grid::get(opCtx)->getBalancerConfiguration(); + uassertStatusOK(balancerConfig->refreshAndCheck(opCtx)); + + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + const auto shardIds = shardRegistry->getAllShardIds(opCtx); + + // Map to be returned, incrementally populated with the collected statistics + stdx::unordered_map<NamespaceString, CollectionDataSizeInfoForBalancing> dataSizeInfoMap; + + std::vector<NamespaceWithOptionalUUID> namespacesWithUUIDsForStatsRequest; + for (const auto& coll : collections) { + const auto& nss = coll.getNss(); + const auto maxChunkSizeBytes = + coll.getMaxChunkSizeBytes().value_or(balancerConfig->getMaxChunkSizeBytes()); + + dataSizeInfoMap.emplace( + nss, + CollectionDataSizeInfoForBalancing(std::map<ShardId, int64_t>(), maxChunkSizeBytes)); + + NamespaceWithOptionalUUID nssWithUUID(nss); + nssWithUUID.setUUID(coll.getUuid()); + namespacesWithUUIDsForStatsRequest.push_back(nssWithUUID); + } + + ShardsvrGetStatsForBalancing req{namespacesWithUUIDsForStatsRequest}; + req.setScaleFactor(1); + const auto reqObj = req.toBSON({}); + + const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + const auto responsesFromShards = + sharding_util::sendCommandToShards(opCtx, + NamespaceString::kAdminDb.toString(), + reqObj, + shardIds, + executor, + false /* throwOnError */); + + for (auto&& response : responsesFromShards) { + try { + const auto& shardId = response.shardId; + const auto errorContext = + "Failed to get stats for balancing from shard '{}'"_format(shardId.toString()); + const auto responseValue = + uassertStatusOKWithContext(std::move(response.swResponse), errorContext); + + const ShardsvrGetStatsForBalancingReply reply = + ShardsvrGetStatsForBalancingReply::parse( + IDLParserErrorContext("ShardsvrGetStatsForBalancingReply"), + std::move(responseValue.data)); + const auto collStatsFromShard = reply.getStats(); + + invariant(collStatsFromShard.size() == collections.size()); + for (const auto& stats : collStatsFromShard) { + invariant(dataSizeInfoMap.contains(stats.getNs())); + dataSizeInfoMap.at(stats.getNs()).shardToDataSizeMap[shardId] = stats.getCollSize(); + } + } catch (const ExceptionFor<ErrorCodes::ShardNotFound>& ex) { + // Handle `removeShard`: skip shards removed during a balancing round + LOGV2_DEBUG(6581603, + 1, + "Skipping shard for the current balancing round", + "error"_attr = redact(ex)); + } + } + + return dataSizeInfoMap; +} + +const CollectionDataSizeInfoForBalancing getDataSizeInfoForCollection(OperationContext* opCtx, + const NamespaceString& nss) { + const auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss); + std::vector<CollectionType> vec{coll}; + return std::move(getDataSizeInfoForCollections(opCtx, vec).at(nss)); +} + /** * Helper class used to accumulate the split points for the same chunk together so they can be * submitted to the shard as a single call versus multiple. This is necessary in order to avoid @@ -345,15 +426,17 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo std::shuffle(collections.begin(), collections.end(), _random); - for (const auto& coll : collections) { - const NamespaceString& nss(coll.getNss()); + static constexpr auto kStatsForBalancingBatchSize = 20; + std::vector<CollectionType> collBatch; + for (auto collIt = collections.begin(); collIt != collections.end();) { + const auto& coll = *(collIt++); if (!coll.getAllowBalance() || !coll.getAllowMigrations() || !coll.getPermitMigrations() || coll.getDefragmentCollection()) { LOGV2_DEBUG(5966401, 1, "Not balancing explicitly disabled collection", - "namespace"_attr = nss, + "namespace"_attr = coll.getNss(), "allowBalance"_attr = coll.getAllowBalance(), "allowMigrations"_attr = coll.getAllowMigrations(), "permitMigrations"_attr = coll.getPermitMigrations(), @@ -361,23 +444,47 @@ StatusWith<MigrateInfoVector> BalancerChunkSelectionPolicyImpl::selectChunksToMo continue; } - auto candidatesStatus = - _getMigrateCandidatesForCollection(opCtx, nss, shardStats, usedShards); - if (candidatesStatus == ErrorCodes::NamespaceNotFound) { - // Namespace got dropped before we managed to get to it, so just skip it - continue; - } else if (!candidatesStatus.isOK()) { - LOGV2_WARNING(21853, - "Unable to balance collection {namespace}: {error}", - "Unable to balance collection", - "namespace"_attr = nss.ns(), - "error"_attr = candidatesStatus.getStatus()); + collBatch.push_back(coll); + if (collBatch.size() < kStatsForBalancingBatchSize && collIt < collections.end()) { + // keep Accumulating in the batch continue; } - candidateChunks.insert(candidateChunks.end(), - std::make_move_iterator(candidatesStatus.getValue().first.begin()), - std::make_move_iterator(candidatesStatus.getValue().first.end())); + boost::optional<stdx::unordered_map<NamespaceString, CollectionDataSizeInfoForBalancing>> + collsDataSizeInfo; + if (feature_flags::gBalanceAccordingToDataSize.isEnabled( + serverGlobalParams.featureCompatibility)) { + collsDataSizeInfo.emplace(getDataSizeInfoForCollections(opCtx, collBatch)); + } + + for (const auto& collFromBatch : collBatch) { + const auto& nss = collFromBatch.getNss(); + + boost::optional<CollectionDataSizeInfoForBalancing> optDataSizeInfo; + if (collsDataSizeInfo.has_value()) { + optDataSizeInfo.emplace(std::move(collsDataSizeInfo->at(nss))); + } + + auto candidatesStatus = _getMigrateCandidatesForCollection( + opCtx, nss, shardStats, optDataSizeInfo, usedShards); + if (candidatesStatus == ErrorCodes::NamespaceNotFound) { + // Namespace got dropped before we managed to get to it, so just skip it + continue; + } else if (!candidatesStatus.isOK()) { + LOGV2_WARNING(21853, + "Unable to balance collection", + "namespace"_attr = nss.ns(), + "error"_attr = candidatesStatus.getStatus()); + continue; + } + + candidateChunks.insert( + candidateChunks.end(), + std::make_move_iterator(candidatesStatus.getValue().first.begin()), + std::make_move_iterator(candidatesStatus.getValue().first.end())); + } + + collBatch.clear(); } return candidateChunks; @@ -398,7 +505,14 @@ StatusWith<MigrateInfosWithReason> BalancerChunkSelectionPolicyImpl::selectChunk stdx::unordered_set<ShardId> usedShards; - auto candidatesStatus = _getMigrateCandidatesForCollection(opCtx, nss, shardStats, &usedShards); + boost::optional<CollectionDataSizeInfoForBalancing> optCollDataSizeInfo; + if (feature_flags::gBalanceAccordingToDataSize.isEnabled( + serverGlobalParams.featureCompatibility)) { + optCollDataSizeInfo.emplace(getDataSizeInfoForCollection(opCtx, nss)); + } + + auto candidatesStatus = _getMigrateCandidatesForCollection( + opCtx, nss, shardStats, optCollDataSizeInfo, &usedShards); if (!candidatesStatus.isOK()) { return candidatesStatus.getStatus(); } @@ -521,6 +635,7 @@ BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection( OperationContext* opCtx, const NamespaceString& nss, const ShardStatisticsVector& shardStats, + const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, stdx::unordered_set<ShardId>* usedShards) { auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss); @@ -577,6 +692,7 @@ BalancerChunkSelectionPolicyImpl::_getMigrateCandidatesForCollection( return BalancerPolicy::balance( shardStats, distribution, + collDataSizeInfo, usedShards, Grid::get(opCtx)->getBalancerConfiguration()->attemptToBalanceJumboChunks()); } diff --git a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h index 76febe0557c..adb3314aa12 100644 --- a/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h +++ b/src/mongo/db/s/balancer/balancer_chunk_selection_policy_impl.h @@ -77,6 +77,7 @@ private: OperationContext* opCtx, const NamespaceString& nss, const ShardStatisticsVector& shardStats, + const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, stdx::unordered_set<ShardId>* usedShards); // Source for obtaining cluster statistics. Not owned and must not be destroyed before the diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index 74ffe76866b..807c988ad9e 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -214,11 +214,12 @@ SemiFuture<void> BalancerCommandsSchedulerImpl::requestMoveChunk( auto externalClientInfo = issuedByRemoteUser ? boost::optional<ExternalClientInfo>(opCtx) : boost::none; + invariant(migrateInfo.maxKey.has_value(), "Bound not present when requesting move chunk"); auto commandInfo = std::make_shared<MoveChunkCommandInfo>(migrateInfo.nss, migrateInfo.from, migrateInfo.to, migrateInfo.minKey, - migrateInfo.maxKey, + *migrateInfo.maxKey, commandSettings.maxChunkSizeBytes, commandSettings.secondaryThrottle, commandSettings.waitForDelete, diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index a2f2dc56a29..412141303c9 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -413,11 +413,12 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq ASSERT_EQ(kNss, recoveredCommand->getNameSpace()); ASSERT_EQ(migrateInfo.from, recoveredCommand->getTarget()); ASSERT_TRUE(recoveredCommand->requiresDistributedLock()); + MoveChunkCommandInfo originalCommandInfo(migrateInfo.nss, migrateInfo.from, migrateInfo.to, migrateInfo.minKey, - migrateInfo.maxKey, + *migrateInfo.maxKey, requestSettings.maxChunkSizeBytes, requestSettings.secondaryThrottle, requestSettings.waitForDelete, @@ -444,7 +445,7 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering auto requestSettings = getMoveChunkSettings(kCustomizedMaxChunkSizeBytes); MigrationType recoveryInfo(migrateInfo.nss, migrateInfo.minKey, - migrateInfo.maxKey, + *migrateInfo.maxKey, migrateInfo.from, migrateInfo.to, migrateInfo.version, diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp index f47c899cee7..e26d4b9d26f 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_test.cpp @@ -630,7 +630,7 @@ TEST_F(BalancerDefragmentationPolicyTest, TestPhaseTwoChunkCanBeMovedAndMergedWi ASSERT_EQ(biggestChunk.getShard(), moveAction.from); ASSERT_EQ(smallestChunk.getShard(), moveAction.to); ASSERT_BSONOBJ_EQ(biggestChunk.getMin(), moveAction.minKey); - ASSERT_BSONOBJ_EQ(biggestChunk.getMax(), moveAction.maxKey); + ASSERT_BSONOBJ_EQ(biggestChunk.getMax(), *moveAction.maxKey); auto nextAction = _defragmentationPolicy.getNextStreamingAction(operationContext()); ASSERT_TRUE(nextAction == boost::none); diff --git a/src/mongo/db/s/balancer/balancer_policy.cpp b/src/mongo/db/s/balancer/balancer_policy.cpp index 50f54efd626..a18c38d7c8c 100644 --- a/src/mongo/db/s/balancer/balancer_policy.cpp +++ b/src/mongo/db/s/balancer/balancer_policy.cpp @@ -284,13 +284,16 @@ Status BalancerPolicy::isShardSuitableReceiver(const ClusterStatistics::ShardSta return Status::OK(); } -ShardId BalancerPolicy::_getLeastLoadedReceiverShard( +std::tuple<ShardId, int64_t> BalancerPolicy::_getLeastLoadedReceiverShard( const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, + const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, const string& tag, const stdx::unordered_set<ShardId>& excludedShards) { ShardId best; - unsigned minChunks = numeric_limits<unsigned>::max(); + unsigned currentMin = numeric_limits<unsigned>::max(); + + const auto shouldBalanceAccordingToDataSize = collDataSizeInfo.has_value(); for (const auto& stat : shardStats) { if (excludedShards.count(stat.shardId)) @@ -301,40 +304,68 @@ ShardId BalancerPolicy::_getLeastLoadedReceiverShard( continue; } - unsigned myChunks = distribution.numberOfChunksInShard(stat.shardId); - if (myChunks >= minChunks) { - continue; - } + if (shouldBalanceAccordingToDataSize) { + const auto& shardSizeIt = collDataSizeInfo->shardToDataSizeMap.find(stat.shardId); + if (shardSizeIt == collDataSizeInfo->shardToDataSizeMap.end()) { + // Skip if stats not available (may happen if add|remove shard during a round) + continue; + } - best = stat.shardId; - minChunks = myChunks; + const auto shardSize = shardSizeIt->second; + if (shardSize < currentMin) { + best = stat.shardId; + currentMin = shardSize; + } + } else { + unsigned myChunks = distribution.numberOfChunksInShard(stat.shardId); + if (myChunks < currentMin) { + best = stat.shardId; + currentMin = myChunks; + } + } } - return best; + return {best, currentMin}; } -ShardId BalancerPolicy::_getMostOverloadedShard( +std::tuple<ShardId, int64_t> BalancerPolicy::_getMostOverloadedShard( const ShardStatisticsVector& shardStats, const DistributionStatus& distribution, + const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, const string& chunkTag, const stdx::unordered_set<ShardId>& excludedShards) { ShardId worst; - unsigned maxChunks = 0; + long long currentMax = numeric_limits<long long>::min(); + + const auto shouldBalanceAccordingToDataSize = collDataSizeInfo.has_value(); for (const auto& stat : shardStats) { if (excludedShards.count(stat.shardId)) continue; - const unsigned shardChunkCount = - distribution.numberOfChunksInShardWithTag(stat.shardId, chunkTag); - if (shardChunkCount <= maxChunks) - continue; + if (shouldBalanceAccordingToDataSize) { + const auto& shardSizeIt = collDataSizeInfo->shardToDataSizeMap.find(stat.shardId); + if (shardSizeIt == collDataSizeInfo->shardToDataSizeMap.end()) { + // Skip if stats not available (may happen if add|remove shard during a round) + continue; + } - worst = stat.shardId; - maxChunks = shardChunkCount; + const auto shardSize = shardSizeIt->second; + if (shardSize > currentMax) { + worst = stat.shardId; + currentMax = shardSize; + } + } else { + const unsigned shardChunkCount = + distribution.numberOfChunksInShardWithTag(stat.shardId, chunkTag); + if (shardChunkCount > currentMax) { + worst = stat.shardId; + currentMax = shardChunkCount; + } + } } - return worst; + return {worst, currentMax}; } // Returns a random integer in [0, max) using a uniform random distribution. @@ -402,10 +433,12 @@ MigrateInfo chooseRandomMigration(const ShardStatisticsVector& shardStats, MoveChunkRequest::ForceJumbo::kDoNotForce}; } -MigrateInfosWithReason BalancerPolicy::balance(const ShardStatisticsVector& shardStats, - const DistributionStatus& distribution, - stdx::unordered_set<ShardId>* usedShards, - bool forceJumbo) { +MigrateInfosWithReason BalancerPolicy::balance( + const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, + stdx::unordered_set<ShardId>* usedShards, + bool forceJumbo) { vector<MigrateInfo> migrations; MigrationReason firstReason = MigrationReason::none; @@ -449,8 +482,8 @@ MigrateInfosWithReason BalancerPolicy::balance(const ShardStatisticsVector& shar const string tag = distribution.getTagForChunk(chunk); - const ShardId to = - _getLeastLoadedReceiverShard(shardStats, distribution, tag, *usedShards); + const auto [to, _] = _getLeastLoadedReceiverShard( + shardStats, distribution, collDataSizeInfo, tag, *usedShards); if (!to.isValid()) { if (migrations.empty()) { LOGV2_WARNING(21889, @@ -509,11 +542,12 @@ MigrateInfosWithReason BalancerPolicy::balance(const ShardStatisticsVector& shar "Chunk violates zone, but it is jumbo and cannot be moved", "chunk"_attr = redact(chunk.toString()), "zone"_attr = redact(tag)); + continue; } - const ShardId to = - _getLeastLoadedReceiverShard(shardStats, distribution, tag, *usedShards); + const auto [to, _] = _getLeastLoadedReceiverShard( + shardStats, distribution, collDataSizeInfo, tag, *usedShards); if (!to.isValid()) { if (migrations.empty()) { LOGV2_WARNING(21892, @@ -548,9 +582,6 @@ MigrateInfosWithReason BalancerPolicy::balance(const ShardStatisticsVector& shar tagsPlusEmpty.push_back(""); for (const auto& tag : tagsPlusEmpty) { - const size_t totalNumberOfChunksWithTag = - (tag.empty() ? distribution.totalChunks() : distribution.totalChunksWithTag(tag)); - size_t totalNumberOfShardsWithTag = 0; for (const auto& stat : shardStats) { @@ -577,18 +608,31 @@ MigrateInfosWithReason BalancerPolicy::balance(const ShardStatisticsVector& shar continue; } - // Calculate the rounded optimal number of chunks per shard - const size_t idealNumberOfChunksPerShardForTag = - (size_t)std::roundf(totalNumberOfChunksWithTag / (float)totalNumberOfShardsWithTag); - - while (_singleZoneBalance(shardStats, - distribution, - tag, - idealNumberOfChunksPerShardForTag, - &migrations, - usedShards, - forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer - : MoveChunkRequest::ForceJumbo::kDoNotForce)) { + auto singleZoneBalance = [&]() { + if (collDataSizeInfo.has_value()) { + return _singleZoneBalanceBasedOnDataSize( + shardStats, + distribution, + *collDataSizeInfo, + tag, + &migrations, + usedShards, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer + : MoveChunkRequest::ForceJumbo::kDoNotForce); + } + + return _singleZoneBalanceBasedOnChunks( + shardStats, + distribution, + tag, + totalNumberOfShardsWithTag, + &migrations, + usedShards, + forceJumbo ? MoveChunkRequest::ForceJumbo::kForceBalancer + : MoveChunkRequest::ForceJumbo::kDoNotForce); + }; + + while (singleZoneBalance()) { if (firstReason == MigrationReason::none) { firstReason = MigrationReason::chunksImbalance; } @@ -604,8 +648,11 @@ boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk( const DistributionStatus& distribution) { const string tag = distribution.getTagForChunk(chunk); - ShardId newShardId = - _getLeastLoadedReceiverShard(shardStats, distribution, tag, stdx::unordered_set<ShardId>()); + const auto [newShardId, _] = _getLeastLoadedReceiverShard(shardStats, + distribution, + boost::none /* collDataSizeInfo */, + tag, + stdx::unordered_set<ShardId>()); if (!newShardId.isValid() || newShardId == chunk.getShard()) { return boost::optional<MigrateInfo>(); } @@ -614,14 +661,21 @@ boost::optional<MigrateInfo> BalancerPolicy::balanceSingleChunk( newShardId, distribution.nss(), chunk, MoveChunkRequest::ForceJumbo::kDoNotForce); } -bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, - const DistributionStatus& distribution, - const string& tag, - size_t idealNumberOfChunksPerShardForTag, - vector<MigrateInfo>* migrations, - stdx::unordered_set<ShardId>* usedShards, - MoveChunkRequest::ForceJumbo forceJumbo) { - const ShardId from = _getMostOverloadedShard(shardStats, distribution, tag, *usedShards); +bool BalancerPolicy::_singleZoneBalanceBasedOnChunks(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const string& tag, + size_t totalNumberOfShardsWithTag, + vector<MigrateInfo>* migrations, + stdx::unordered_set<ShardId>* usedShards, + MoveChunkRequest::ForceJumbo forceJumbo) { + // Calculate the rounded optimal number of chunks per shard + const size_t totalNumberOfChunksWithTag = + (tag.empty() ? distribution.totalChunks() : distribution.totalChunksWithTag(tag)); + const size_t idealNumberOfChunksPerShardForTag = + (size_t)std::roundf(totalNumberOfChunksWithTag / (float)totalNumberOfShardsWithTag); + + const auto [from, fromSize] = + _getMostOverloadedShard(shardStats, distribution, boost::none, tag, *usedShards); if (!from.isValid()) return false; @@ -631,7 +685,8 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, if (max <= idealNumberOfChunksPerShardForTag) return false; - const ShardId to = _getLeastLoadedReceiverShard(shardStats, distribution, tag, *usedShards); + const auto [to, toSize] = + _getLeastLoadedReceiverShard(shardStats, distribution, boost::none, tag, *usedShards); if (!to.isValid()) { if (migrations->empty()) { LOGV2(21882, @@ -704,6 +759,87 @@ bool BalancerPolicy::_singleZoneBalance(const ShardStatisticsVector& shardStats, return false; } +bool BalancerPolicy::_singleZoneBalanceBasedOnDataSize( + const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const CollectionDataSizeInfoForBalancing& collDataSizeInfo, + const string& tag, + vector<MigrateInfo>* migrations, + stdx::unordered_set<ShardId>* usedShards, + MoveChunkRequest::ForceJumbo forceJumbo) { + const auto [from, fromSize] = + _getMostOverloadedShard(shardStats, distribution, collDataSizeInfo, tag, *usedShards); + if (!from.isValid()) + return false; + + const auto [to, toSize] = + _getLeastLoadedReceiverShard(shardStats, distribution, collDataSizeInfo, tag, *usedShards); + if (!to.isValid()) { + if (migrations->empty()) { + LOGV2(6581600, "No available shards to take chunks for zone", "zone"_attr = tag); + } + return false; + } + + if (from == to) { + return false; + } + + LOGV2_DEBUG(6581601, + 1, + "Balancing single zone", + "namespace"_attr = distribution.nss().ns(), + "zone"_attr = tag, + "fromShardId"_attr = from, + "fromShardDataSize"_attr = fromSize, + "toShardId"_attr = to, + "toShardDataSize"_attr = toSize, + "maxChunkSizeBytes"_attr = collDataSizeInfo.maxChunkSizeBytes); + + if (fromSize - toSize < 2 * collDataSizeInfo.maxChunkSizeBytes) { + // Do not balance if the collection's size differs too few between the chosen shards + return false; + } + + const vector<ChunkType>& chunks = distribution.getChunks(from); + + unsigned numJumboChunks = 0; + + for (const auto& chunk : chunks) { + if (distribution.getTagForChunk(chunk) != tag) + continue; + + if (chunk.getJumbo()) { + numJumboChunks++; + continue; + } + + migrations->emplace_back(to, + chunk.getShard(), + distribution.nss(), + chunk.getCollectionUUID(), + chunk.getMin(), + boost::none /* call moveRange*/, + chunk.getVersion(), + forceJumbo, + collDataSizeInfo.maxChunkSizeBytes); + invariant(usedShards->insert(chunk.getShard()).second); + invariant(usedShards->insert(to).second); + return true; + } + + if (numJumboChunks) { + LOGV2_WARNING(6581602, + "Shard has only jumbo chunks for this collection and cannot be balanced", + "namespace"_attr = distribution.nss().ns(), + "shardId"_attr = from, + "zone"_attr = tag, + "numJumboChunks"_attr = numJumboChunks); + } + + return false; +} + ZoneRange::ZoneRange(const BSONObj& a_min, const BSONObj& a_max, const std::string& _zone) : min(a_min.getOwned()), max(a_max.getOwned()), zone(_zone) {} @@ -732,15 +868,17 @@ MigrateInfo::MigrateInfo(const ShardId& a_to, const NamespaceString& a_nss, const UUID& a_uuid, const BSONObj& a_min, - const BSONObj& a_max, + const boost::optional<BSONObj>& a_max, const ChunkVersion& a_version, - const MoveChunkRequest::ForceJumbo a_forceJumbo) + const MoveChunkRequest::ForceJumbo a_forceJumbo, + boost::optional<int64_t> maxChunkSizeBytes) : nss(a_nss), uuid(a_uuid), minKey(a_min), maxKey(a_max), version(a_version), - forceJumbo(a_forceJumbo) { + forceJumbo(a_forceJumbo), + optMaxChunkSizeBytes(maxChunkSizeBytes) { invariant(a_to.isValid()); invariant(a_from.isValid()); diff --git a/src/mongo/db/s/balancer/balancer_policy.h b/src/mongo/db/s/balancer/balancer_policy.h index 008c1d93b7e..0a047098615 100644 --- a/src/mongo/db/s/balancer/balancer_policy.h +++ b/src/mongo/db/s/balancer/balancer_policy.h @@ -66,9 +66,10 @@ struct MigrateInfo { const NamespaceString& a_nss, const UUID& a_uuid, const BSONObj& a_min, - const BSONObj& a_max, + const boost::optional<BSONObj>& a_max, const ChunkVersion& a_version, - MoveChunkRequest::ForceJumbo a_forceJumbo); + MoveChunkRequest::ForceJumbo a_forceJumbo, + boost::optional<int64_t> maxChunkSizeBytes = boost::none); std::string getName() const; @@ -81,9 +82,15 @@ struct MigrateInfo { ShardId to; ShardId from; BSONObj minKey; - BSONObj maxKey; + + // May be optional in case of moveRange + boost::optional<BSONObj> maxKey; ChunkVersion version; MoveChunkRequest::ForceJumbo forceJumbo; + + // Set only in case of data-size aware balancing + // TODO SERVER-65322 make `optMaxChunkSizeBytes` non-optional + boost::optional<int64_t> optMaxChunkSizeBytes; }; enum MigrationReason { none, drain, zoneViolation, chunksImbalance }; @@ -205,6 +212,18 @@ typedef stdx::variant<Status, StatusWith<AutoSplitVectorResponse>, StatusWith<Da typedef std::vector<ClusterStatistics::ShardStatistics> ShardStatisticsVector; typedef std::map<ShardId, std::vector<ChunkType>> ShardToChunksMap; +/* + * Keeps track of info needed for data size aware balancing. + */ +struct CollectionDataSizeInfoForBalancing { + CollectionDataSizeInfoForBalancing(std::map<ShardId, int64_t>&& shardToDataSizeMap, + long maxChunkSizeBytes) + : shardToDataSizeMap(std::move(shardToDataSizeMap)), maxChunkSizeBytes(maxChunkSizeBytes) {} + + std::map<ShardId, int64_t> shardToDataSizeMap; + const int64_t maxChunkSizeBytes; +}; + /** * Keeps track of zones for a collection. */ @@ -364,11 +383,11 @@ public: const std::string& chunkTag); /** - * Returns a suggested set of chunks to move whithin a collection's shards, given the specified - * state of the shards (draining, max size reached, etc) and the number of chunks for that - * collection. If the policy doesn't recommend anything to move, it returns an empty vector. The - * entries in the vector do are all for separate source/destination shards and as such do not - * need to be done serially and can be scheduled in parallel. + * Returns a suggested set of chunks or ranges to move within a collection's shards, given the + * specified state of the shards (draining, max size reached, etc) and the number of chunks or + * data size for that collection. If the policy doesn't recommend anything to move, it returns + * an empty vector. The entries in the vector do are all for separate source/destination shards + * and as such do not need to be done serially and can be scheduled in parallel. * * The balancing logic calculates the optimum number of chunks per shard for each zone and if * any of the shards have chunks, which are sufficiently higher than this number, suggests @@ -378,10 +397,12 @@ public: * used for migrations. Used so we don't return multiple conflicting migrations for the same * shard. */ - static MigrateInfosWithReason balance(const ShardStatisticsVector& shardStats, - const DistributionStatus& distribution, - stdx::unordered_set<ShardId>* usedShards, - bool forceJumbo); + static MigrateInfosWithReason balance( + const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, + stdx::unordered_set<ShardId>* usedShards, + bool forceJumbo); /** * Using the specified distribution information, returns a suggested better location for the @@ -392,43 +413,69 @@ public: const DistributionStatus& distribution); private: - /** - * Return the shard with the specified tag, which has the least number of chunks. If the tag is - * empty, considers all shards. + /* + * Only considers shards with the specified tag, all shards in case the tag is empty. + * + * Returns a tuple <ShardID, number of chunks> referring the shard with less chunks. + * + * If balancing based on collection size on shards: + * - Returns a tuple <ShardID, amount of data in bytes> referring the shard with less data. */ - static ShardId _getLeastLoadedReceiverShard(const ShardStatisticsVector& shardStats, - const DistributionStatus& distribution, - const std::string& tag, - const stdx::unordered_set<ShardId>& excludedShards); + static std::tuple<ShardId, int64_t> _getLeastLoadedReceiverShard( + const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, + const std::string& tag, + const stdx::unordered_set<ShardId>& excludedShards); /** - * Return the shard which has the least number of chunks with the specified tag. If the tag is - * empty, considers all chunks. + * Only considers shards with the specified tag, all shards in case the tag is empty. + * + * If balancing based on number of chunks: + * - Returns a tuple <ShardID, number of chunks> referring the shard with more chunks. + * + * If balancing based on collection size on shards: + * - Returns a tuple <ShardID, amount of data in bytes> referring the shard with more data. */ - static ShardId _getMostOverloadedShard(const ShardStatisticsVector& shardStats, - const DistributionStatus& distribution, - const std::string& chunkTag, - const stdx::unordered_set<ShardId>& excludedShards); + static std::tuple<ShardId, int64_t> _getMostOverloadedShard( + const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const boost::optional<CollectionDataSizeInfoForBalancing>& collDataSizeInfo, + const std::string& chunkTag, + const stdx::unordered_set<ShardId>& excludedShards); /** * Selects one chunk for the specified zone (if appropriate) to be moved in order to bring the * deviation of the shards chunk contents closer to even across all shards in the specified * zone. Takes into account and updates the shards, which have already been used for migrations. * - * The 'idealNumberOfChunksPerShardForTag' indicates what is the ideal number of chunks which - * each shard must have and is used to determine the imbalance and also to prevent chunks from - * moving when not necessary. + * Returns true if a migration was suggested, false otherwise. This method is intented to be + * called multiple times until all posible migrations for a zone have been selected. + */ + static bool _singleZoneBalanceBasedOnChunks(const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const std::string& tag, + size_t totalNumberOfShardsWithTag, + std::vector<MigrateInfo>* migrations, + stdx::unordered_set<ShardId>* usedShards, + MoveChunkRequest::ForceJumbo forceJumbo); + + /** + * Selects one range for the specified zone (if appropriate) to be moved in order to bring the + * deviation of the collection data size closer to even across all shards in the specified + * zone. Takes into account and updates the shards, which have already been used for migrations. * * Returns true if a migration was suggested, false otherwise. This method is intented to be * called multiple times until all posible migrations for a zone have been selected. */ - static bool _singleZoneBalance(const ShardStatisticsVector& shardStats, - const DistributionStatus& distribution, - const std::string& tag, - size_t idealNumberOfChunksPerShardForTag, - std::vector<MigrateInfo>* migrations, - stdx::unordered_set<ShardId>* usedShards, - MoveChunkRequest::ForceJumbo forceJumbo); + static bool _singleZoneBalanceBasedOnDataSize( + const ShardStatisticsVector& shardStats, + const DistributionStatus& distribution, + const CollectionDataSizeInfoForBalancing& collDataSizeInfo, + const std::string& tag, + std::vector<MigrateInfo>* migrations, + stdx::unordered_set<ShardId>* usedShards, + MoveChunkRequest::ForceJumbo forceJumbo); }; } // namespace mongo diff --git a/src/mongo/db/s/balancer/balancer_policy_test.cpp b/src/mongo/db/s/balancer/balancer_policy_test.cpp index d32e163b090..fb98d610b00 100644 --- a/src/mongo/db/s/balancer/balancer_policy_test.cpp +++ b/src/mongo/db/s/balancer/balancer_policy_test.cpp @@ -118,7 +118,8 @@ MigrateInfosWithReason balanceChunks(const ShardStatisticsVector& shardStats, bool shouldAggressivelyBalance, bool forceJumbo) { stdx::unordered_set<ShardId> usedShards; - return BalancerPolicy::balance(shardStats, distribution, &usedShards, forceJumbo); + return BalancerPolicy::balance( + shardStats, distribution, boost::none /* collDataSizeInfo */, &usedShards, forceJumbo); } TEST(BalancerPolicy, Basic) { @@ -133,7 +134,7 @@ TEST(BalancerPolicy, Basic) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); } @@ -149,7 +150,7 @@ TEST(BalancerPolicy, SmallClusterShouldBePerfectlyBalanced) { ASSERT_EQ(kShardId1, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); } @@ -206,13 +207,13 @@ TEST(BalancerPolicy, ParallelBalancing) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); ASSERT_EQ(kShardId1, migrations[1].from); ASSERT_EQ(kShardId3, migrations[1].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[1].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[1].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), *migrations[1].maxKey); } TEST(BalancerPolicy, ParallelBalancingDoesNotPutChunksOnShardsAboveTheOptimal) { @@ -231,13 +232,13 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotPutChunksOnShardsAboveTheOptimal) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId4, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); ASSERT_EQ(kShardId1, migrations[1].from); ASSERT_EQ(kShardId5, migrations[1].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[1].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[1].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), *migrations[1].maxKey); } TEST(BalancerPolicy, ParallelBalancingDoesNotMoveChunksFromShardsBelowOptimal) { @@ -254,7 +255,7 @@ TEST(BalancerPolicy, ParallelBalancingDoesNotMoveChunksFromShardsBelowOptimal) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId3, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); } @@ -267,14 +268,18 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNe // Here kShardId0 would have been selected as a donor stdx::unordered_set<ShardId> usedShards{kShardId0}; - const auto [migrations, reason] = BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false); + const auto [migrations, reason] = + BalancerPolicy::balance(cluster.first, + DistributionStatus(kNamespace, cluster.second), + boost::none /* collDataSizeInfo */, + &usedShards, + false); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId1, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); } @@ -287,8 +292,12 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseSourceShardsWithMoveNo // Here kShardId0 would have been selected as a donor stdx::unordered_set<ShardId> usedShards{kShardId0}; - const auto [migrations, reason] = BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false); + const auto [migrations, reason] = + BalancerPolicy::balance(cluster.first, + DistributionStatus(kNamespace, cluster.second), + boost::none /* collDataSizeInfo */, + &usedShards, + false); ASSERT_EQ(0U, migrations.size()); } @@ -301,14 +310,18 @@ TEST(BalancerPolicy, ParallelBalancingNotSchedulingOnInUseDestinationShards) { // Here kShardId2 would have been selected as a recipient stdx::unordered_set<ShardId> usedShards{kShardId2}; - const auto [migrations, reason] = BalancerPolicy::balance( - cluster.first, DistributionStatus(kNamespace, cluster.second), &usedShards, false); + const auto [migrations, reason] = + BalancerPolicy::balance(cluster.first, + DistributionStatus(kNamespace, cluster.second), + boost::none /* collDataSizeInfo */, + &usedShards, + false); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId3, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); } @@ -328,7 +341,7 @@ TEST(BalancerPolicy, JumboChunksNotMoved) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); } @@ -356,13 +369,13 @@ TEST(BalancerPolicy, JumboChunksNotMovedParallel) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::chunksImbalance, reason); ASSERT_EQ(kShardId2, migrations[1].from); ASSERT_EQ(kShardId3, migrations[1].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMin(), migrations[1].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMax(), migrations[1].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][2].getMax(), *migrations[1].maxKey); } TEST(BalancerPolicy, DrainingSingleChunk) { @@ -377,7 +390,7 @@ TEST(BalancerPolicy, DrainingSingleChunk) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::drain, reason); } @@ -396,13 +409,13 @@ TEST(BalancerPolicy, DrainingSingleChunkPerShard) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::drain, reason); ASSERT_EQ(kShardId2, migrations[1].from); ASSERT_EQ(kShardId3, migrations[1].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[1].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[1].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), *migrations[1].maxKey); } TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) { @@ -417,7 +430,7 @@ TEST(BalancerPolicy, DrainingWithTwoChunksFirstOneSelected) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::drain, reason); } @@ -436,7 +449,7 @@ TEST(BalancerPolicy, DrainingMultipleShardsFirstOneSelected) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::drain, reason); } @@ -467,7 +480,7 @@ TEST(BalancerPolicy, DrainingSingleAppropriateShardFoundDueToTag) { ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::drain, reason); } @@ -512,7 +525,7 @@ TEST(BalancerPolicy, BalancerRespectsMaxShardSizeOnlyBalanceToNonMaxed) { ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), *migrations[0].maxKey); } TEST(BalancerPolicy, BalancerRespectsMaxShardSizeWhenAllBalanced) { @@ -545,7 +558,7 @@ TEST(BalancerPolicy, BalancerRespectsTagsWhenDraining) { ASSERT_EQ(kShardId1, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId1][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::drain, reason); } @@ -565,7 +578,7 @@ TEST(BalancerPolicy, BalancerRespectsTagPolicyBeforeImbalance) { ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::zoneViolation, reason); } @@ -586,7 +599,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsWithCrossShardViolationOfTags) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::zoneViolation, reason); } @@ -605,7 +618,7 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedCluster) { ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::zoneViolation, reason); } @@ -639,7 +652,7 @@ TEST(BalancerPolicy, BalancerMostOverLoadShardHasMultipleTags) { ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId1, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][1].getMax(), *migrations[0].maxKey); } TEST(BalancerPolicy, BalancerMostOverLoadShardHasMultipleTagsSkipTagWithShardInUse) { @@ -657,14 +670,14 @@ TEST(BalancerPolicy, BalancerMostOverLoadShardHasMultipleTagsSkipTagWithShardInU ASSERT_OK(distribution.addRangeToZone(ZoneRange(BSON("x" << 3), BSON("x" << 5), "c"))); stdx::unordered_set<ShardId> usedShards{kShardId1}; - const auto [migrations, reason] = - BalancerPolicy::balance(cluster.first, distribution, &usedShards, false); + const auto [migrations, reason] = BalancerPolicy::balance( + cluster.first, distribution, boost::none /* collDataSizeInfo */, &usedShards, false); ASSERT_EQ(1U, migrations.size()); ASSERT_EQ(kShardId0, migrations[0].from); ASSERT_EQ(kShardId2, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][3].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][3].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId0][3].getMax(), *migrations[0].maxKey); } TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParallel) { @@ -684,13 +697,13 @@ TEST(BalancerPolicy, BalancerFixesIncorrectTagsInOtherwiseBalancedClusterParalle ASSERT_EQ(kShardId2, migrations[0].from); ASSERT_EQ(kShardId0, migrations[0].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMin(), migrations[0].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), migrations[0].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId2][0].getMax(), *migrations[0].maxKey); ASSERT_EQ(MigrationReason::zoneViolation, reason); ASSERT_EQ(kShardId3, migrations[1].from); ASSERT_EQ(kShardId1, migrations[1].to); ASSERT_BSONOBJ_EQ(cluster.second[kShardId3][0].getMin(), migrations[1].minKey); - ASSERT_BSONOBJ_EQ(cluster.second[kShardId3][0].getMax(), migrations[1].maxKey); + ASSERT_BSONOBJ_EQ(cluster.second[kShardId3][0].getMax(), *migrations[1].maxKey); ASSERT_EQ(MigrationReason::zoneViolation, reason); } diff --git a/src/mongo/db/s/sharding_util.cpp b/src/mongo/db/s/sharding_util.cpp index 6e66978dcd2..fde594f35cb 100644 --- a/src/mongo/db/s/sharding_util.cpp +++ b/src/mongo/db/s/sharding_util.cpp @@ -66,7 +66,8 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards( StringData dbName, const BSONObj& command, const std::vector<ShardId>& shardIds, - const std::shared_ptr<executor::TaskExecutor>& executor) { + const std::shared_ptr<executor::TaskExecutor>& executor, + const bool throwOnError) { std::vector<AsyncRequestsSender::Request> requests; for (const auto& shardId : shardIds) { requests.emplace_back(shardId, command); @@ -91,17 +92,20 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards( // Retrieve the responses and throw at the first failure. auto response = ars.next(); - const auto errorContext = "Failed command {} for database '{}' on shard '{}'"_format( - command.toString(), dbName, StringData{response.shardId}); + if (throwOnError) { + const auto errorContext = + "Failed command {} for database '{}' on shard '{}'"_format( + command.toString(), dbName, StringData{response.shardId}); - auto shardResponse = - uassertStatusOKWithContext(std::move(response.swResponse), errorContext); + auto shardResponse = + uassertStatusOKWithContext(std::move(response.swResponse), errorContext); - auto status = getStatusFromCommandResult(shardResponse.data); - uassertStatusOKWithContext(status, errorContext); + auto status = getStatusFromCommandResult(shardResponse.data); + uassertStatusOKWithContext(status, errorContext); - auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); - uassertStatusOKWithContext(wcStatus, errorContext); + auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); + uassertStatusOKWithContext(wcStatus, errorContext); + } responses.push_back(std::move(response)); } diff --git a/src/mongo/db/s/sharding_util.h b/src/mongo/db/s/sharding_util.h index 3484b680748..c5021b4d46f 100644 --- a/src/mongo/db/s/sharding_util.h +++ b/src/mongo/db/s/sharding_util.h @@ -50,14 +50,16 @@ void tellShardsToRefreshCollection(OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor); /** - * Generic utility to send a command to a list of shards. Throws if one of the commands fails. + * Generic utility to send a command to a list of shards. If `throwOnError=true`, throws in case one + * of the commands fails. */ std::vector<AsyncRequestsSender::Response> sendCommandToShards( OperationContext* opCtx, StringData dbName, const BSONObj& command, const std::vector<ShardId>& shardIds, - const std::shared_ptr<executor::TaskExecutor>& executor); + const std::shared_ptr<executor::TaskExecutor>& executor, + bool throwOnError = true); /** * Unset the `noAutosplit` and `maxChunkSizeBytes` fields from: diff --git a/src/mongo/db/s/shardsvr_get_stats_for_balancing_command.cpp b/src/mongo/db/s/shardsvr_get_stats_for_balancing_command.cpp index d3057c4c088..f7ac383144f 100644 --- a/src/mongo/db/s/shardsvr_get_stats_for_balancing_command.cpp +++ b/src/mongo/db/s/shardsvr_get_stats_for_balancing_command.cpp @@ -32,10 +32,10 @@ #include "mongo/db/commands.h" #include "mongo/db/db_raii.h" #include "mongo/db/s/balancer_stats_registry.h" -#include "mongo/db/s/get_stats_for_balancing_gen.h" #include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/get_stats_for_balancing_gen.h" #include "mongo/s/sharding_feature_flags_gen.h" #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 0851cd5e890..53f366319e3 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -187,6 +187,7 @@ env.Library( 'refine_collection_shard_key_coordinator_feature_flags.idl', 'request_types/abort_reshard_collection.idl', 'request_types/add_shard_request_type.cpp', + 'request_types/get_stats_for_balancing.idl', 'request_types/add_shard_to_zone_request_type.cpp', 'request_types/auto_split_vector.idl', 'request_types/balance_chunk_request_type.cpp', diff --git a/src/mongo/db/s/get_stats_for_balancing.idl b/src/mongo/s/request_types/get_stats_for_balancing.idl index 7ee96da4eaa..7ee96da4eaa 100644 --- a/src/mongo/db/s/get_stats_for_balancing.idl +++ b/src/mongo/s/request_types/get_stats_for_balancing.idl diff --git a/src/mongo/shell/shardingtest.js b/src/mongo/shell/shardingtest.js index e505085c731..d17ff1cfe10 100644 --- a/src/mongo/shell/shardingtest.js +++ b/src/mongo/shell/shardingtest.js @@ -642,11 +642,13 @@ var ShardingTest = function(params) { this.awaitBalance = function(collName, dbName, timeToWait) { timeToWait = timeToWait || 60000; + const mongos = this.s; assert.soon(function() { - var x = self.chunkDiff(collName, dbName); - print("chunk diff: " + x); - return x < 2; - }, "no balance happened", timeToWait); + return assert + .commandWorked( + mongos.adminCommand({balancerCollectionStatus: dbName + '.' + collName})) + .balancerCompliant; + }, 'Timed out waiting for the collection to be balanced', timeToWait /* timeout */); }; this.getShard = function(coll, query, includeEmpty) { |