diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-04-06 13:36:35 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-04-12 14:04:20 -0400 |
commit | 3c5ee8b721d3ea5b18d22527fcbea4e2e15e79ad (patch) | |
tree | de1c103b277da2e69f0dce58b13125d8afcd8146 /src/mongo/s | |
parent | 9c2e3c15fc7874ea610bad23d9cafe46b22b1cb5 (diff) | |
download | mongo-3c5ee8b721d3ea5b18d22527fcbea4e2e15e79ad.tar.gz |
SERVER-22667 Move Chunk::pickSplitVector/MedianKey out of Chunk
The pickSplitVector and selectMedianKey utilities logically belong to the
balancer, so moving them out of the Chunk class and into shard_util.
The changes to balance.cpp have no functional effect, just rearranging
them to better match the balancer control flow and to make the subsequent
changes easier to follow.
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/balance.cpp | 458 | ||||
-rw-r--r-- | src/mongo/s/balance.h | 33 | ||||
-rw-r--r-- | src/mongo/s/balancer_policy.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/catalog/catalog_manager_common.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 120 | ||||
-rw-r--r-- | src/mongo/s/chunk.h | 24 | ||||
-rw-r--r-- | src/mongo/s/chunk_manager.cpp | 24 | ||||
-rw-r--r-- | src/mongo/s/grid.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/grid.h | 6 | ||||
-rw-r--r-- | src/mongo/s/shard_util.cpp | 86 | ||||
-rw-r--r-- | src/mongo/s/shard_util.h | 42 |
11 files changed, 433 insertions, 375 deletions
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 115a84eebf4..2a2cbbd0804 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -152,7 +152,7 @@ MONGO_FP_DECLARE(balancerRoundIntervalSetting); } // namespace -Balancer::Balancer() : _balancedLastTime(0), _policy(new BalancerPolicy()) {} +Balancer::Balancer() : _balancedLastTime(0) {} Balancer::~Balancer() = default; @@ -160,132 +160,181 @@ Balancer* Balancer::get(OperationContext* operationContext) { return &getBalancer(operationContext->getServiceContext()); } -int Balancer::_moveChunks(OperationContext* txn, - const vector<MigrateInfo>& candidateChunks, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete) { - int movedCount = 0; - - for (const auto& migrateInfo : candidateChunks) { - // If the balancer was disabled since we started this round, don't start new chunks - // moves. - const auto balSettingsResult = - grid.catalogManager(txn)->getGlobalSettings(txn, SettingsType::BalancerDocKey); - - const bool isBalSettingsAbsent = - balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; +void Balancer::run() { + Client::initThread("Balancer"); - if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { - warning() << balSettingsResult.getStatus(); - return movedCount; + // This is the body of a BackgroundJob so if we throw here we're basically ending the balancer + // thread prematurely. + while (!inShutdown()) { + auto txn = cc().makeOperationContext(); + if (!_init(txn.get())) { + log() << "will retry to initialize balancer in one minute"; + sleepsecs(60); + continue; } - const SettingsType& balancerConfig = - isBalSettingsAbsent ? SettingsType{} : balSettingsResult.getValue(); + break; + } - if ((!isBalSettingsAbsent && !Chunk::shouldBalance(balancerConfig)) || - MONGO_FAIL_POINT(skipBalanceRound)) { - LOG(1) << "Stopping balancing round early as balancing was disabled"; - return movedCount; - } + Seconds balanceRoundInterval(kBalanceRoundDefaultInterval); - // Changes to metadata, borked metadata, and connectivity problems between shards - // should cause us to abort this chunk move, but shouldn't cause us to abort the entire - // round of chunks. - // - // TODO(spencer): We probably *should* abort the whole round on issues communicating - // with the config servers, but its impossible to distinguish those types of failures - // at the moment. - // - // TODO: Handle all these things more cleanly, since they're expected problems + while (!inShutdown()) { + auto txn = cc().makeOperationContext(); - const NamespaceString nss(migrateInfo.ns); + BalanceRoundDetails roundDetails; try { - shared_ptr<DBConfig> cfg = - uassertStatusOK(grid.catalogCache()->getDatabase(txn, nss.db().toString())); - - // NOTE: We purposely do not reload metadata here, since _doBalanceRound already - // tried to do so once. - shared_ptr<ChunkManager> cm = cfg->getChunkManager(txn, migrateInfo.ns); - uassert(28628, - str::stream() - << "Collection " << migrateInfo.ns - << " was deleted while balancing was active. Aborting balancing round.", - cm); + // ping has to be first so we keep things in the config server in sync + _ping(txn.get()); - ChunkPtr c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min); + MONGO_FAIL_POINT_BLOCK(balancerRoundIntervalSetting, scopedBalancerRoundInterval) { + const BSONObj& data = scopedBalancerRoundInterval.getData(); + balanceRoundInterval = Seconds(data["sleepSecs"].numberInt()); + } - if (c->getMin().woCompare(migrateInfo.chunk.min) || - c->getMax().woCompare(migrateInfo.chunk.max)) { - // Likely a split happened somewhere, so force reload the chunk manager - cm = cfg->getChunkManager(txn, migrateInfo.ns, true); - invariant(cm); + BSONObj balancerResult; - c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min); + // use fresh shard state + grid.shardRegistry()->reload(txn.get()); - if (c->getMin().woCompare(migrateInfo.chunk.min) || - c->getMax().woCompare(migrateInfo.chunk.max)) { - log() << "chunk mismatch after reload, ignoring will retry issue " - << migrateInfo.chunk.toString(); + // refresh chunk size (even though another balancer might be active) + Chunk::refreshChunkSize(txn.get()); - continue; - } + auto balSettingsResult = grid.catalogManager(txn.get())->getGlobalSettings( + txn.get(), SettingsType::BalancerDocKey); + const bool isBalSettingsAbsent = + balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; + if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { + warning() << balSettingsResult.getStatus(); + return; } - BSONObj res; - if (c->moveAndCommit(txn, - migrateInfo.to, - Chunk::MaxChunkSize, - secondaryThrottle, - waitForDelete, - 0, /* maxTimeMS */ - res)) { - movedCount++; + const SettingsType& balancerConfig = + isBalSettingsAbsent ? SettingsType{} : balSettingsResult.getValue(); + + // now make sure we should even be running + if ((!isBalSettingsAbsent && !Chunk::shouldBalance(balancerConfig)) || + MONGO_FAIL_POINT(skipBalanceRound)) { + LOG(1) << "skipping balancing round because balancing is disabled"; + + // Ping again so scripts can determine if we're active without waiting + _ping(txn.get(), true); + + sleepFor(balanceRoundInterval); continue; } - // The move requires acquiring the collection metadata's lock, which can fail. - log() << "balancer move failed: " << res << " from: " << migrateInfo.from - << " to: " << migrateInfo.to << " chunk: " << migrateInfo.chunk; + uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get())); - Status moveStatus = getStatusFromCommandResult(res); + { + auto scopedDistLock = grid.catalogManager(txn.get()) + ->distLock(txn.get(), + "balancer", + "doing balance round", + DistLockManager::kSingleLockAttemptTimeout); - if (moveStatus == ErrorCodes::ChunkTooBig || res["chunkTooBig"].trueValue()) { - // Reload just to be safe - cm = cfg->getChunkManager(txn, migrateInfo.ns); - invariant(cm); + if (!scopedDistLock.isOK()) { + LOG(1) << "skipping balancing round" << causedBy(scopedDistLock.getStatus()); - c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min); + // Ping again so scripts can determine if we're active without waiting + _ping(txn.get(), true); - log() << "performing a split because migrate failed for size reasons"; + sleepFor(balanceRoundInterval); // no need to wake up soon + continue; + } - Status status = c->split(txn, Chunk::normal, NULL, NULL); - log() << "split results: " << status; + const bool waitForDelete = + (balancerConfig.isWaitForDeleteSet() ? balancerConfig.getWaitForDelete() + : false); - if (!status.isOK()) { - log() << "marking chunk as jumbo: " << c->toString(); + MigrationSecondaryThrottleOptions secondaryThrottle( + MigrationSecondaryThrottleOptions::create( + MigrationSecondaryThrottleOptions::kDefault)); + if (balancerConfig.isKeySet()) { + secondaryThrottle = + uassertStatusOK(MigrationSecondaryThrottleOptions::createFromBalancerConfig( + balancerConfig.toBSON())); + } - c->markAsJumbo(txn); + LOG(1) << "*** start balancing round. " + << "waitForDelete: " << waitForDelete + << ", secondaryThrottle: " << secondaryThrottle.toBSON(); - // We increment moveCount so we do another round right away - movedCount++; + const auto candidateChunks = uassertStatusOK(_getCandidateChunks(txn.get())); + + if (candidateChunks.empty()) { + LOG(1) << "no need to move any chunk"; + _balancedLastTime = 0; + } else { + _balancedLastTime = + _moveChunks(txn.get(), candidateChunks, secondaryThrottle, waitForDelete); + + roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()), + _balancedLastTime); + + grid.catalogManager(txn.get()) + ->logAction(txn.get(), "balancer.round", "", roundDetails.toBSON()); } + + LOG(1) << "*** End of balancing round"; } - } catch (const DBException& ex) { - warning() << "could not move chunk " << migrateInfo.chunk.toString() - << ", continuing balancing round" << causedBy(ex); + + // Ping again so scripts can determine if we're active without waiting + _ping(txn.get(), true); + + sleepFor(_balancedLastTime ? kShortBalanceRoundInterval : balanceRoundInterval); + } catch (const std::exception& e) { + log() << "caught exception while doing balance: " << e.what(); + + // Just to match the opening statement if in log level 1 + LOG(1) << "*** End of balancing round"; + + // This round failed, tell the world! + roundDetails.setFailed(e.what()); + + grid.catalogManager(txn.get()) + ->logAction(txn.get(), "balancer.round", "", roundDetails.toBSON()); + + // Sleep a fair amount before retrying because of the error + sleepFor(balanceRoundInterval); } } +} - return movedCount; +bool Balancer::_init(OperationContext* txn) { + log() << "about to contact config servers and shards"; + + try { + // Contact the config server and refresh shard information. Checks that each shard is indeed + // a different process (no hostname mixup) + // these checks are redundant in that they're redone at every new round but we want to do + // them initially here so to catch any problem soon + grid.shardRegistry()->reload(txn); + if (!_checkOIDs(txn)) { + return false; + } + + log() << "config servers and shards contacted successfully"; + + StringBuilder buf; + buf << getHostNameCached() << ":" << serverGlobalParams.port; + _myid = buf.str(); + + log() << "balancer id: " << _myid << " started"; + + return true; + } catch (const std::exception& e) { + warning() << "could not initialize balancer, please check that all shards and config " + "servers are up: " << e.what(); + return false; + } } void Balancer::_ping(OperationContext* txn, bool waiting) { MongosType mType; mType.setName(_myid); mType.setPing(jsTime()); - mType.setUptime(static_cast<int>(time(0) - _started)); + mType.setUptime(_timer.seconds()); mType.setWaiting(waiting); mType.setMongoVersion(versionString); @@ -350,7 +399,7 @@ bool Balancer::_checkOIDs(OperationContext* txn) { return true; } -StatusWith<std::vector<MigrateInfo>> Balancer::_doBalanceRound(OperationContext* txn) { +StatusWith<vector<MigrateInfo>> Balancer::_getCandidateChunks(OperationContext* txn) { vector<CollectionType> collections; Status collsStatus = @@ -498,7 +547,7 @@ StatusWith<std::vector<MigrateInfo>> Balancer::_doBalanceRound(OperationContext* } shared_ptr<MigrateInfo> migrateInfo( - _policy->balance(nss.ns(), distStatus, _balancedLastTime)); + BalancerPolicy::balance(nss.ns(), distStatus, _balancedLastTime)); if (migrateInfo) { candidateChunks.emplace_back(*migrateInfo); } @@ -507,176 +556,125 @@ StatusWith<std::vector<MigrateInfo>> Balancer::_doBalanceRound(OperationContext* return candidateChunks; } -bool Balancer::_init(OperationContext* txn) { - try { - log() << "about to contact config servers and shards"; - - // contact the config server and refresh shard information - // checks that each shard is indeed a different process (no hostname mixup) - // these checks are redundant in that they're redone at every new round but we want to do - // them initially here so to catch any problem soon - grid.shardRegistry()->reload(txn); - if (!_checkOIDs(txn)) { - return false; - } - - log() << "config servers and shards contacted successfully"; - - StringBuilder buf; - buf << getHostNameCached() << ":" << serverGlobalParams.port; - _myid = buf.str(); - _started = time(0); - - log() << "balancer id: " << _myid << " started"; - - return true; +int Balancer::_moveChunks(OperationContext* txn, + const vector<MigrateInfo>& candidateChunks, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + int movedCount = 0; - } catch (std::exception& e) { - warning() << "could not initialize balancer, please check that all shards and config " - "servers are up: " << e.what(); - return false; - } -} + for (const auto& migrateInfo : candidateChunks) { + // If the balancer was disabled since we started this round, don't start new chunks + // moves. + const auto balSettingsResult = + grid.catalogManager(txn)->getGlobalSettings(txn, SettingsType::BalancerDocKey); -void Balancer::run() { - Client::initThread("Balancer"); + const bool isBalSettingsAbsent = + balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; - // This is the body of a BackgroundJob so if we throw here we're basically ending the balancer - // thread prematurely. - while (!inShutdown()) { - auto txn = cc().makeOperationContext(); - if (!_init(txn.get())) { - log() << "will retry to initialize balancer in one minute"; - sleepsecs(60); - continue; + if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { + warning() << balSettingsResult.getStatus(); + return movedCount; } - break; - } + const SettingsType& balancerConfig = + isBalSettingsAbsent ? SettingsType{} : balSettingsResult.getValue(); - Seconds balanceRoundInterval(kBalanceRoundDefaultInterval); + if ((!isBalSettingsAbsent && !Chunk::shouldBalance(balancerConfig)) || + MONGO_FAIL_POINT(skipBalanceRound)) { + LOG(1) << "Stopping balancing round early as balancing was disabled"; + return movedCount; + } - while (!inShutdown()) { - auto txn = cc().makeOperationContext(); + // Changes to metadata, borked metadata, and connectivity problems between shards + // should cause us to abort this chunk move, but shouldn't cause us to abort the entire + // round of chunks. + // + // TODO(spencer): We probably *should* abort the whole round on issues communicating + // with the config servers, but its impossible to distinguish those types of failures + // at the moment. + // + // TODO: Handle all these things more cleanly, since they're expected problems - BalanceRoundDetails roundDetails; + const NamespaceString nss(migrateInfo.ns); try { - // ping has to be first so we keep things in the config server in sync - _ping(txn.get()); - - MONGO_FAIL_POINT_BLOCK(balancerRoundIntervalSetting, scopedBalancerRoundInterval) { - const BSONObj& data = scopedBalancerRoundInterval.getData(); - balanceRoundInterval = Seconds(data["sleepSecs"].numberInt()); - } - - BSONObj balancerResult; + shared_ptr<DBConfig> cfg = + uassertStatusOK(grid.catalogCache()->getDatabase(txn, nss.db().toString())); - // use fresh shard state - grid.shardRegistry()->reload(txn.get()); + // NOTE: We purposely do not reload metadata here, since _getCandidateChunks already + // tried to do so once + shared_ptr<ChunkManager> cm = cfg->getChunkManager(txn, migrateInfo.ns); + uassert(28628, + str::stream() + << "Collection " << migrateInfo.ns + << " was deleted while balancing was active. Aborting balancing round.", + cm); - // refresh chunk size (even though another balancer might be active) - Chunk::refreshChunkSize(txn.get()); + ChunkPtr c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min); - auto balSettingsResult = grid.catalogManager(txn.get())->getGlobalSettings( - txn.get(), SettingsType::BalancerDocKey); - const bool isBalSettingsAbsent = - balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; - if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { - warning() << balSettingsResult.getStatus(); - return; - } + if (c->getMin().woCompare(migrateInfo.chunk.min) || + c->getMax().woCompare(migrateInfo.chunk.max)) { + // Likely a split happened somewhere, so force reload the chunk manager + cm = cfg->getChunkManager(txn, migrateInfo.ns, true); + invariant(cm); - const SettingsType& balancerConfig = - isBalSettingsAbsent ? SettingsType{} : balSettingsResult.getValue(); + c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min); - // now make sure we should even be running - if ((!isBalSettingsAbsent && !Chunk::shouldBalance(balancerConfig)) || - MONGO_FAIL_POINT(skipBalanceRound)) { - LOG(1) << "skipping balancing round because balancing is disabled"; + if (c->getMin().woCompare(migrateInfo.chunk.min) || + c->getMax().woCompare(migrateInfo.chunk.max)) { + log() << "chunk mismatch after reload, ignoring will retry issue " + << migrateInfo.chunk.toString(); - // Ping again so scripts can determine if we're active without waiting - _ping(txn.get(), true); + continue; + } + } - sleepFor(balanceRoundInterval); + BSONObj res; + if (c->moveAndCommit(txn, + migrateInfo.to, + Chunk::MaxChunkSize, + secondaryThrottle, + waitForDelete, + 0, /* maxTimeMS */ + res)) { + movedCount++; continue; } - uassert(13258, "oids broken after resetting!", _checkOIDs(txn.get())); - - { - auto scopedDistLock = grid.catalogManager(txn.get()) - ->distLock(txn.get(), - "balancer", - "doing balance round", - DistLockManager::kSingleLockAttemptTimeout); - - if (!scopedDistLock.isOK()) { - LOG(1) << "skipping balancing round" << causedBy(scopedDistLock.getStatus()); - - // Ping again so scripts can determine if we're active without waiting - _ping(txn.get(), true); + // The move requires acquiring the collection metadata's lock, which can fail. + log() << "balancer move failed: " << res << " from: " << migrateInfo.from + << " to: " << migrateInfo.to << " chunk: " << migrateInfo.chunk; - sleepFor(balanceRoundInterval); // no need to wake up soon - continue; - } + Status moveStatus = getStatusFromCommandResult(res); - const bool waitForDelete = - (balancerConfig.isWaitForDeleteSet() ? balancerConfig.getWaitForDelete() - : false); + if (moveStatus == ErrorCodes::ChunkTooBig || res["chunkTooBig"].trueValue()) { + // Reload just to be safe + cm = cfg->getChunkManager(txn, migrateInfo.ns); + invariant(cm); - MigrationSecondaryThrottleOptions secondaryThrottle( - MigrationSecondaryThrottleOptions::create( - MigrationSecondaryThrottleOptions::kDefault)); - if (balancerConfig.isKeySet()) { - secondaryThrottle = - uassertStatusOK(MigrationSecondaryThrottleOptions::createFromBalancerConfig( - balancerConfig.toBSON())); - } + c = cm->findIntersectingChunk(txn, migrateInfo.chunk.min); - LOG(1) << "*** start balancing round. " - << "waitForDelete: " << waitForDelete - << ", secondaryThrottle: " << secondaryThrottle.toBSON(); + log() << "performing a split because migrate failed for size reasons"; - const auto candidateChunks = uassertStatusOK(_doBalanceRound(txn.get())); + Status status = c->split(txn, Chunk::normal, NULL, NULL); + log() << "split results: " << status; - if (candidateChunks.empty()) { - LOG(1) << "no need to move any chunk"; - _balancedLastTime = 0; - } else { - _balancedLastTime = - _moveChunks(txn.get(), candidateChunks, secondaryThrottle, waitForDelete); + if (!status.isOK()) { + log() << "marking chunk as jumbo: " << c->toString(); - roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()), - _balancedLastTime); + c->markAsJumbo(txn); - grid.catalogManager(txn.get()) - ->logAction(txn.get(), "balancer.round", "", roundDetails.toBSON()); + // We increment moveCount so we do another round right away + movedCount++; } - - LOG(1) << "*** End of balancing round"; } - - // Ping again so scripts can determine if we're active without waiting - _ping(txn.get(), true); - - sleepFor(_balancedLastTime ? kShortBalanceRoundInterval : balanceRoundInterval); - } catch (const std::exception& e) { - log() << "caught exception while doing balance: " << e.what(); - - // Just to match the opening statement if in log level 1 - LOG(1) << "*** End of balancing round"; - - // This round failed, tell the world! - roundDetails.setFailed(e.what()); - - grid.catalogManager(txn.get()) - ->logAction(txn.get(), "balancer.round", "", roundDetails.toBSON()); - - // Sleep a fair amount before retrying because of the error - sleepFor(balanceRoundInterval); + } catch (const DBException& ex) { + warning() << "could not move chunk " << migrateInfo.chunk.toString() + << ", continuing balancing round" << causedBy(ex); } } + + return movedCount; } } // namespace mongo diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index cf001ad9d3d..17b58384590 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -32,10 +32,10 @@ #include <vector> #include "mongo/util/background.h" +#include "mongo/util/timer.h" namespace mongo { -class BalancerPolicy; struct MigrateInfo; class MigrationSecondaryThrottleOptions; class OperationContext; @@ -81,12 +81,23 @@ private: bool _init(OperationContext* txn); /** + * Marks this balancer as being live on the config server(s). + */ + void _ping(OperationContext* txn, bool waiting = false); + + /** + * Returns true if all the servers listed in configdb as being shards are reachable and are + * distinct processes (no hostname mixup). + */ + bool _checkOIDs(OperationContext* txn); + + /** * Gathers all the necessary information about shards and chunks, and decides whether there are * candidate chunks to be moved. * * Returns candidate chunks, one per collection, that could possibly be moved */ - StatusWith<std::vector<MigrateInfo>> _doBalanceRound(OperationContext* txn); + StatusWith<std::vector<MigrateInfo>> _getCandidateChunks(OperationContext* txn); /** * Issues chunk migration request, one at a time. @@ -101,28 +112,14 @@ private: const MigrationSecondaryThrottleOptions& secondaryThrottle, bool waitForDelete); - /** - * Marks this balancer as being live on the config server(s). - */ - void _ping(OperationContext* txn, bool waiting = false); - - /** - * @return true if all the servers listed in configdb as being shards are reachable and are - * distinct processes - */ - bool _checkOIDs(OperationContext* txn); - // hostname:port of my mongos std::string _myid; - // time the Balancer started running - time_t _started; + // Time the Balancer started running + Timer _timer; // number of moved chunks in last round int _balancedLastTime; - - // decide which chunks to move; owned here. - std::unique_ptr<BalancerPolicy> _policy; }; } // namespace mongo diff --git a/src/mongo/s/balancer_policy.cpp b/src/mongo/s/balancer_policy.cpp index e4ac76a0db9..1a307df60b2 100644 --- a/src/mongo/s/balancer_policy.cpp +++ b/src/mongo/s/balancer_policy.cpp @@ -284,8 +284,8 @@ StatusWith<ShardInfoMap> DistributionStatus::populateShardInfoMap(OperationConte for (const ShardType& shardData : shards) { std::set<std::string> dummy; - const long long shardSizeBytes = uassertStatusOK( - shardutil::retrieveTotalShardSize(txn, shardData.getName(), grid.shardRegistry())); + const long long shardSizeBytes = + uassertStatusOK(shardutil::retrieveTotalShardSize(txn, shardData.getName())); const std::string shardMongodVersion = retrieveShardMongoDVersion(txn, shardData.getName(), grid.shardRegistry()); diff --git a/src/mongo/s/catalog/catalog_manager_common.cpp b/src/mongo/s/catalog/catalog_manager_common.cpp index dad99c9f3ee..1079ec02ce2 100644 --- a/src/mongo/s/catalog/catalog_manager_common.cpp +++ b/src/mongo/s/catalog/catalog_manager_common.cpp @@ -505,8 +505,7 @@ StatusWith<ShardId> CatalogManagerCommon::selectShardForNewDatabase(OperationCon ShardId candidateShardId = allShardIds[0]; - auto candidateSizeStatus = - shardutil::retrieveTotalShardSize(txn, candidateShardId, shardRegistry); + auto candidateSizeStatus = shardutil::retrieveTotalShardSize(txn, candidateShardId); if (!candidateSizeStatus.isOK()) { return candidateSizeStatus.getStatus(); } @@ -514,7 +513,7 @@ StatusWith<ShardId> CatalogManagerCommon::selectShardForNewDatabase(OperationCon for (size_t i = 1; i < allShardIds.size(); i++) { const ShardId shardId = allShardIds[i]; - const auto sizeStatus = shardutil::retrieveTotalShardSize(txn, shardId, shardRegistry); + const auto sizeStatus = shardutil::retrieveTotalShardSize(txn, shardId); if (!sizeStatus.isOK()) { return sizeStatus.getStatus(); } diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 982ccffb2d1..60ef9294cf9 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -52,7 +52,7 @@ #include "mongo/s/grid.h" #include "mongo/s/migration_secondary_throttle_options.h" #include "mongo/s/move_chunk_request.h" -#include "mongo/s/shard_key_pattern.h" +#include "mongo/s/shard_util.h" #include "mongo/util/log.h" namespace mongo { @@ -304,98 +304,54 @@ BSONObj Chunk::_getExtremeKey(OperationContext* txn, bool doSplitAtLower) const return _manager->getShardKeyPattern().extractShardKeyFromDoc(end); } -void Chunk::pickMedianKey(OperationContext* txn, BSONObj& medianKey) const { - // Ask the mongod holding this chunk to figure out the split points. - ScopedDbConnection conn(_getShardConnectionString(txn)); - BSONObj result; - BSONObjBuilder cmd; - cmd.append("splitVector", _manager->getns()); - cmd.append("keyPattern", _manager->getShardKeyPattern().toBSON()); - cmd.append("min", getMin()); - cmd.append("max", getMax()); - cmd.appendBool("force", true); - BSONObj cmdObj = cmd.obj(); - - if (!conn->runCommand("admin", cmdObj, result)) { - conn.done(); - ostringstream os; - os << "splitVector command (median key) failed: " << result; - uassert(13503, os.str(), 0); - } - - BSONObjIterator it(result.getObjectField("splitKeys")); - if (it.more()) { - medianKey = it.next().Obj().getOwned(); - } - - conn.done(); -} - -void Chunk::pickSplitVector(OperationContext* txn, - vector<BSONObj>& splitPoints, - long long chunkSize /* bytes */, - int maxPoints, - int maxObjs) const { - BSONObjBuilder cmd; - cmd.append("splitVector", _manager->getns()); - cmd.append("keyPattern", _manager->getShardKeyPattern().toBSON()); - cmd.append("min", getMin()); - cmd.append("max", getMax()); - cmd.append("maxChunkSizeBytes", chunkSize); - cmd.append("maxSplitPoints", maxPoints); - cmd.append("maxChunkObjects", maxObjs); - - BSONObj cmdObj = cmd.obj(); - - auto result = grid.shardRegistry()->runIdempotentCommandOnShard( - txn, - getShardId(), - ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, - "admin", - cmdObj); - - uassertStatusOK(result.getStatus()); - uassertStatusOK(getStatusFromCommandResult(result.getValue())); - - BSONObjIterator it(result.getValue().getObjectField("splitKeys")); - while (it.more()) { - splitPoints.push_back(it.next().Obj().getOwned()); - } -} +std::vector<BSONObj> Chunk::_determineSplitPoints(OperationContext* txn, bool atMedian) const { + // If splitting is not obligatory we may return early if there are not enough data we cap the + // number of objects that would fall in the first half (before the split point) the rationale is + // we'll find a split point without traversing all the data. + vector<BSONObj> splitPoints; -void Chunk::determineSplitPoints(OperationContext* txn, - bool atMedian, - vector<BSONObj>* splitPoints) const { - // if splitting is not obligatory we may return early if there are not enough data - // we cap the number of objects that would fall in the first half (before the split point) - // the rationale is we'll find a split point without traversing all the data if (atMedian) { - BSONObj medianKey; - pickMedianKey(txn, medianKey); - if (!medianKey.isEmpty()) - splitPoints->push_back(medianKey); + BSONObj medianKey = + uassertStatusOK(shardutil::selectMedianKey(txn, + _shardId, + NamespaceString(_manager->getns()), + _manager->getShardKeyPattern(), + _min, + _max)); + if (!medianKey.isEmpty()) { + splitPoints.push_back(medianKey); + } } else { long long chunkSize = _manager->getCurrentDesiredChunkSize(); // Note: One split point for every 1/2 chunk size. const int estNumSplitPoints = _dataWritten / chunkSize * 2; - if (estNumSplitPoints >= kTooManySplitPoints) { - // The current desired chunk size will split the chunk into lots of small chunks - // (At the worst case, this can result into thousands of chunks); so check and - // see if a bigger value can be used. + if (estNumSplitPoints >= kTooManySplitPoints) { + // The current desired chunk size will split the chunk into lots of small chunk and at + // the worst case this can result into thousands of chunks. So check and see if a bigger + // value can be used. chunkSize = std::min(_dataWritten, Chunk::MaxChunkSize); } - pickSplitVector(txn, *splitPoints, chunkSize, 0, MaxObjectPerChunk); - - if (splitPoints->size() <= 1) { - // no split points means there isn't enough data to split on - // 1 split point means we have between half the chunk size to full chunk size - // so we shouldn't split - splitPoints->clear(); + splitPoints = + uassertStatusOK(shardutil::selectChunkSplitPoints(txn, + _shardId, + NamespaceString(_manager->getns()), + _manager->getShardKeyPattern(), + _min, + _max, + chunkSize, + 0, + MaxObjectPerChunk)); + if (splitPoints.size() <= 1) { + // No split points means there isn't enough data to split on 1 split point means we have + // between half the chunk size to full chunk size so we shouldn't split. + splitPoints.clear(); } } + + return splitPoints; } Status Chunk::split(OperationContext* txn, @@ -408,9 +364,7 @@ Status Chunk::split(OperationContext* txn, } bool atMedian = mode == Chunk::atMedian; - vector<BSONObj> splitPoints; - - determineSplitPoints(txn, atMedian, &splitPoints); + vector<BSONObj> splitPoints = _determineSplitPoints(txn, atMedian); if (splitPoints.empty()) { string msg; if (atMedian) { diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 0adb301bf23..553c0bbec9d 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -160,26 +160,6 @@ public: const std::vector<BSONObj>& splitPoints, BSONObj* res) const; - /** - * Asks the mongod holding this chunk to find a key that approximately divides this chunk in two - * - * @param medianKey the key that divides this chunk, if there is one, or empty - */ - void pickMedianKey(OperationContext* txn, BSONObj& medianKey) const; - - /** - * Ask the mongod holding this chunk to figure out the split points. - * @param splitPoints vector to be filled in - * @param chunkSize chunk size to target in bytes - * @param maxPoints limits the number of split points that are needed, zero is max (optional) - * @param maxObjs limits the number of objects in each chunk, zero is as max (optional) - */ - void pickSplitVector(OperationContext* txn, - std::vector<BSONObj>& splitPoints, - long long chunkSize, - int maxPoints = 0, - int maxObjs = 0) const; - // // migration support // @@ -300,9 +280,7 @@ private: * @param atMedian perform a single split at the middle of this chunk. * @param splitPoints out parameter containing the chosen split points. Can be empty. */ - void determineSplitPoints(OperationContext* txn, - bool atMedian, - std::vector<BSONObj>* splitPoints) const; + std::vector<BSONObj> _determineSplitPoints(OperationContext* txn, bool atMedian) const; /** * initializes _dataWritten with a random value so that a mongos restart diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 2b1f9eef85d..5728cfe07db 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -54,6 +54,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" +#include "mongo/s/shard_util.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" @@ -347,14 +348,9 @@ void ChunkManager::calcInitSplitsAndShards(OperationContext* txn, const set<ShardId>* initShardIds, vector<BSONObj>* splitPoints, vector<ShardId>* shardIds) const { - verify(_chunkMap.size() == 0); + invariant(_chunkMap.empty()); - Chunk c(this, - _keyPattern.getKeyPattern().globalMin(), - _keyPattern.getKeyPattern().globalMax(), - primaryShardId); - - if (!initPoints || !initPoints->size()) { + if (!initPoints || initPoints->empty()) { // discover split points const auto primaryShard = grid.shardRegistry()->getShard(txn, primaryShardId); const NamespaceString nss{getns()}; @@ -370,8 +366,18 @@ void ChunkManager::calcInitSplitsAndShards(OperationContext* txn, uassertStatusOK(getStatusFromCommandResult(result.getValue())); uassertStatusOK(bsonExtractIntegerField(result.getValue(), "n", &numObjects)); - if (numObjects > 0) - c.pickSplitVector(txn, *splitPoints, Chunk::MaxChunkSize); + if (numObjects > 0) { + *splitPoints = uassertStatusOK( + shardutil::selectChunkSplitPoints(txn, + primaryShardId, + NamespaceString(_ns), + _keyPattern, + _keyPattern.getKeyPattern().globalMin(), + _keyPattern.getKeyPattern().globalMax(), + Chunk::MaxChunkSize, + 0, + 0)); + } // since docs already exists, must use primary shard shardIds->push_back(primaryShardId); diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index d584b87c0c5..9d36641bd05 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -46,6 +46,12 @@ Grid grid; Grid::Grid() : _allowLocalShard(true) {} +Grid::~Grid() = default; + +Grid* Grid::get(OperationContext* operationContext) { + return &grid; +} + void Grid::init(std::unique_ptr<CatalogManager> catalogManager, std::unique_ptr<CatalogCache> catalogCache, std::unique_ptr<ShardRegistry> shardRegistry, diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index 1db22f630e8..7c96ab4e5cb 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -47,6 +47,12 @@ class ShardRegistry; class Grid { public: Grid(); + ~Grid(); + + /** + * Retrieves the instance of Grid associated with the current service context. + */ + static Grid* get(OperationContext* operationContext); /** * Called at startup time so the global sharding services can be set. This method must be called diff --git a/src/mongo/s/shard_util.cpp b/src/mongo/s/shard_util.cpp index 777c33db427..9b51baa1e22 100644 --- a/src/mongo/s/shard_util.cpp +++ b/src/mongo/s/shard_util.cpp @@ -33,15 +33,18 @@ #include "mongo/base/status_with.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" +#include "mongo/db/namespace_string.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_key_pattern.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace shardutil { -StatusWith<long long> retrieveTotalShardSize(OperationContext* txn, - ShardId shardId, - ShardRegistry* shardRegistry) { +StatusWith<long long> retrieveTotalShardSize(OperationContext* txn, const ShardId& shardId) { + auto shardRegistry = Grid::get(txn)->shardRegistry(); auto listDatabasesStatus = shardRegistry->runIdempotentCommandOnShard( txn, shardId, @@ -60,5 +63,82 @@ StatusWith<long long> retrieveTotalShardSize(OperationContext* txn, return totalSizeElem.numberLong(); } +StatusWith<BSONObj> selectMedianKey(OperationContext* txn, + const ShardId& shardId, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const BSONObj& minKey, + const BSONObj& maxKey) { + BSONObjBuilder cmd; + cmd.append("splitVector", nss.ns()); + cmd.append("keyPattern", shardKeyPattern.toBSON()); + cmd.append("min", minKey); + cmd.append("max", maxKey); + cmd.appendBool("force", true); + + auto shardRegistry = Grid::get(txn)->shardRegistry(); + auto cmdStatus = shardRegistry->runIdempotentCommandOnShard( + txn, shardId, ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, "admin", cmd.obj()); + if (!cmdStatus.isOK()) { + return cmdStatus.getStatus(); + } + + const auto response = std::move(cmdStatus.getValue()); + + Status status = getStatusFromCommandResult(response); + if (!status.isOK()) { + return status; + } + + BSONObjIterator it(response.getObjectField("splitKeys")); + if (it.more()) { + return it.next().Obj().getOwned(); + } + + return BSONObj(); +} + +StatusWith<std::vector<BSONObj>> selectChunkSplitPoints(OperationContext* txn, + const ShardId& shardId, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const BSONObj& minKey, + const BSONObj& maxKey, + long long chunkSizeBytes, + int maxPoints, + int maxObjs) { + BSONObjBuilder cmd; + cmd.append("splitVector", nss.ns()); + cmd.append("keyPattern", shardKeyPattern.toBSON()); + cmd.append("min", minKey); + cmd.append("max", maxKey); + cmd.append("maxChunkSizeBytes", chunkSizeBytes); + cmd.append("maxSplitPoints", maxPoints); + cmd.append("maxChunkObjects", maxObjs); + + auto shardRegistry = Grid::get(txn)->shardRegistry(); + auto cmdStatus = shardRegistry->runIdempotentCommandOnShard( + txn, shardId, ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, "admin", cmd.obj()); + if (!cmdStatus.isOK()) { + return cmdStatus.getStatus(); + } + + const auto response = std::move(cmdStatus.getValue()); + + Status status = getStatusFromCommandResult(response); + if (!status.isOK()) { + return status; + } + + std::vector<BSONObj> splitPoints; + + BSONObjIterator it(response.getObjectField("splitKeys")); + while (it.more()) { + splitPoints.push_back(it.next().Obj().getOwned()); + } + + return std::move(splitPoints); +} + } // namespace shardutil } // namespace mongo diff --git a/src/mongo/s/shard_util.h b/src/mongo/s/shard_util.h index 640dd09d5c1..aaba2f4ddf7 100644 --- a/src/mongo/s/shard_util.h +++ b/src/mongo/s/shard_util.h @@ -29,12 +29,15 @@ #pragma once #include <string> +#include <vector> #include "mongo/s/client/shard.h" namespace mongo { +class NamespaceString; class OperationContext; +class ShardKeyPattern; class ShardRegistry; template <typename T> class StatusWith; @@ -43,6 +46,7 @@ class StatusWith; * Set of methods used to introspect the state of individual shards. */ namespace shardutil { + /** * Executes the listDatabases command against the specified shard and obtains the total data * size across all databases in bytes (essentially, the totalSize field). @@ -51,9 +55,39 @@ namespace shardutil { * ShardNotFound if shard by that id is not available on the registry * NoSuchKey if the total shard size could not be retrieved */ -StatusWith<long long> retrieveTotalShardSize(OperationContext* txn, - ShardId shardId, - ShardRegistry* shardRegistry); -}; +StatusWith<long long> retrieveTotalShardSize(OperationContext* txn, const ShardId& shardId); + +/** + * Asks the mongod holding this chunk to find a key that approximately divides the specified chunk + * in two. + */ +StatusWith<BSONObj> selectMedianKey(OperationContext* txn, + const ShardId& shardId, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const BSONObj& minKey, + const BSONObj& maxKey); + +/** + * Ask the specified shard to figure out the split points for a given chunk. + * + * @param shardId The shard id to query. + * @param nss Namespace, which owns the chunk. + * @param shardKeyPattern The shard key which corresponds to this sharded namespace. + * @param minKey/maxKey Bounds of the chunk. + * @param chunkSize Chunk size to target in bytes. + * @param maxPoints Limits the number of split points that are needed. Zero means max. + * @param maxObjs Limits the number of objects in each chunk. Zero means max. + */ +StatusWith<std::vector<BSONObj>> selectChunkSplitPoints(OperationContext* txn, + const ShardId& shardId, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + const BSONObj& minKey, + const BSONObj& maxKey, + long long chunkSizeBytes, + int maxPoints, + int maxObjs); +} // namespace shardutil } // namespace mongo |