diff options
Diffstat (limited to 'src/mongo/s')
44 files changed, 631 insertions, 397 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 7fc744d4367..acbc0dc08cf 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -115,7 +115,6 @@ env.Library( target='cluster_ops_impl', source=[ 'chunk_manager_targeter.cpp', - 'cluster_explain.cpp', 'cluster_write.cpp', 'dbclient_shard_resolver.cpp', ], @@ -163,6 +162,7 @@ env.Library( # This is only here temporarily for auto-split logic in chunk.cpp. 'balance.cpp', 'balancer_policy.cpp', + 'balancer/balancer_configuration.cpp', 'balancer/cluster_statistics.cpp', 'balancer/cluster_statistics_impl.cpp', 'catalog/catalog_cache.cpp', @@ -180,7 +180,6 @@ env.Library( 'catalog/catalog_types', 'catalog/replset/catalog_manager_replica_set', 'client/sharding_client', - 'cluster_ops_impl', 'common', ], LIBDEPS_TAGS=[ @@ -209,11 +208,8 @@ env.Library( target='mongoscore', source=[ 'cluster_cursor_stats.cpp', - 'request.cpp', 's_only.cpp', 's_sharding_server_status.cpp', - 'strategy.cpp', - 'version_mongos.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmongos', diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index cc1b71d804f..25df6654901 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -46,6 +46,7 @@ #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/balancer/cluster_statistics_impl.h" #include "mongo/s/balancer_policy.h" #include "mongo/s/catalog/catalog_cache.h" @@ -53,7 +54,6 @@ #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_mongos.h" -#include "mongo/s/catalog/type_settings.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/chunk_manager.h" @@ -203,29 +203,19 @@ void Balancer::run() { balanceRoundInterval = Seconds(data["sleepSecs"].numberInt()); } - BSONObj balancerResult; + // Use fresh shard state and balancer settings + Grid::get(txn.get())->shardRegistry()->reload(txn.get()); - // use fresh shard state - grid.shardRegistry()->reload(txn.get()); - - // refresh chunk size (even though another balancer might be active) - Chunk::refreshChunkSize(txn.get()); - - auto balSettingsResult = grid.catalogManager(txn.get())->getGlobalSettings( - txn.get(), SettingsType::BalancerDocKey); - const bool isBalSettingsAbsent = - balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; - if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { - warning() << balSettingsResult.getStatus(); - return; + auto balancerConfig = Grid::get(txn.get())->getBalancerConfiguration(); + Status refreshStatus = balancerConfig->refreshAndCheck(txn.get()); + if (!refreshStatus.isOK()) { + warning() << "Skipping balancing round" << causedBy(refreshStatus); + sleepFor(balanceRoundInterval); + continue; } - const SettingsType& balancerConfig = - isBalSettingsAbsent ? SettingsType{} : balSettingsResult.getValue(); - // now make sure we should even be running - if ((!isBalSettingsAbsent && !Chunk::shouldBalance(balancerConfig)) || - MONGO_FAIL_POINT(skipBalanceRound)) { + if (!balancerConfig->isBalancerActive() || MONGO_FAIL_POINT(skipBalanceRound)) { LOG(1) << "skipping balancing round because balancing is disabled"; // Ping again so scripts can determine if we're active without waiting @@ -254,22 +244,10 @@ void Balancer::run() { continue; } - const bool waitForDelete = - (balancerConfig.isWaitForDeleteSet() ? balancerConfig.getWaitForDelete() - : false); - - MigrationSecondaryThrottleOptions secondaryThrottle( - MigrationSecondaryThrottleOptions::create( - MigrationSecondaryThrottleOptions::kDefault)); - if (balancerConfig.isKeySet()) { - secondaryThrottle = - uassertStatusOK(MigrationSecondaryThrottleOptions::createFromBalancerConfig( - balancerConfig.toBSON())); - } - LOG(1) << "*** start balancing round. " - << "waitForDelete: " << waitForDelete - << ", secondaryThrottle: " << secondaryThrottle.toBSON(); + << "waitForDelete: " << balancerConfig->waitForDelete() + << ", secondaryThrottle: " + << balancerConfig->getSecondaryThrottle().toBSON(); const auto candidateChunks = uassertStatusOK(_getCandidateChunks(txn.get())); @@ -277,8 +255,10 @@ void Balancer::run() { LOG(1) << "no need to move any chunk"; _balancedLastTime = 0; } else { - _balancedLastTime = - _moveChunks(txn.get(), candidateChunks, secondaryThrottle, waitForDelete); + _balancedLastTime = _moveChunks(txn.get(), + candidateChunks, + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete()); roundDetails.setSucceeded(static_cast<int>(candidateChunks.size()), _balancedLastTime); @@ -563,21 +543,7 @@ int Balancer::_moveChunks(OperationContext* txn, for (const auto& migrateInfo : candidateChunks) { // If the balancer was disabled since we started this round, don't start new chunks // moves. - const auto balSettingsResult = - grid.catalogManager(txn)->getGlobalSettings(txn, SettingsType::BalancerDocKey); - - const bool isBalSettingsAbsent = - balSettingsResult.getStatus() == ErrorCodes::NoMatchingDocument; - - if (!balSettingsResult.isOK() && !isBalSettingsAbsent) { - warning() << balSettingsResult.getStatus(); - return movedCount; - } - - const SettingsType& balancerConfig = - isBalSettingsAbsent ? SettingsType{} : balSettingsResult.getValue(); - - if ((!isBalSettingsAbsent && !Chunk::shouldBalance(balancerConfig)) || + if (!Grid::get(txn)->getBalancerConfiguration()->isBalancerActive() || MONGO_FAIL_POINT(skipBalanceRound)) { LOG(1) << "Stopping balancing round early as balancing was disabled"; return movedCount; @@ -630,7 +596,7 @@ int Balancer::_moveChunks(OperationContext* txn, BSONObj res; if (c->moveAndCommit(txn, migrateInfo.to, - Chunk::MaxChunkSize, + Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), secondaryThrottle, waitForDelete, 0, /* maxTimeMS */ diff --git a/src/mongo/s/balancer/balancer_configuration.cpp b/src/mongo/s/balancer/balancer_configuration.cpp new file mode 100644 index 00000000000..67348fa5fc7 --- /dev/null +++ b/src/mongo/s/balancer/balancer_configuration.cpp @@ -0,0 +1,189 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/balancer/balancer_configuration.h" + +#include "mongo/base/status.h" +#include "mongo/s/catalog/catalog_manager.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +BalancerConfiguration::BalancerConfiguration(uint64_t defaultMaxChunkSizeBytes) + : _secondaryThrottle( + MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault)), + _defaultMaxChunkSizeBytes(defaultMaxChunkSizeBytes) { + invariant(checkMaxChunkSizeValid(defaultMaxChunkSizeBytes)); + + _useDefaultBalancerSettings(); + _useDefaultChunkSizeSettings(); +} + +BalancerConfiguration::~BalancerConfiguration() = default; + +bool BalancerConfiguration::isBalancerActive() const { + if (!_shouldBalance.loadRelaxed()) { + return false; + } + + stdx::lock_guard<stdx::mutex> lk(_balancerSettingsMutex); + if (_balancerSettings.isBalancerActiveWindowSet()) { + return _balancerSettings.inBalancingWindow(boost::posix_time::second_clock::local_time()); + } + + return true; +} + +MigrationSecondaryThrottleOptions BalancerConfiguration::getSecondaryThrottle() const { + stdx::lock_guard<stdx::mutex> lk(_balancerSettingsMutex); + return _secondaryThrottle; +} + +bool BalancerConfiguration::waitForDelete() const { + stdx::lock_guard<stdx::mutex> lk(_balancerSettingsMutex); + return _waitForDelete; +} + +Status BalancerConfiguration::refreshAndCheck(OperationContext* txn) { + // Balancer configuration + Status balancerSettingsStatus = _refreshBalancerSettings(txn); + if (!balancerSettingsStatus.isOK()) { + return {balancerSettingsStatus.code(), + str::stream() << "Failed to refresh the balancer settings due to " + << balancerSettingsStatus.toString()}; + } + + // Chunk size settings + Status chunkSizeStatus = _refreshChunkSizeSettings(txn); + if (!chunkSizeStatus.isOK()) { + return {chunkSizeStatus.code(), + str::stream() << "Failed to refresh the chunk sizes settings due to " + << chunkSizeStatus.toString()}; + } + + return Status::OK(); +} + +bool BalancerConfiguration::checkMaxChunkSizeValid(uint64_t maxChunkSize) { + if (maxChunkSize >= (1024 * 1024) && maxChunkSize <= (1024 * 1024 * 1024)) { + return true; + } + + return false; +} + +Status BalancerConfiguration::_refreshBalancerSettings(OperationContext* txn) { + SettingsType balancerSettings; + + auto balanceSettingsStatus = + Grid::get(txn)->catalogManager(txn)->getGlobalSettings(txn, SettingsType::BalancerDocKey); + if (balanceSettingsStatus.isOK()) { + balancerSettings = std::move(balanceSettingsStatus.getValue()); + } else if (balanceSettingsStatus.getStatus() != ErrorCodes::NoMatchingDocument) { + return balanceSettingsStatus.getStatus(); + } else { + _useDefaultBalancerSettings(); + return Status::OK(); + } + + if (balancerSettings.isBalancerStoppedSet() && balancerSettings.getBalancerStopped()) { + _shouldBalance.store(false); + } else { + _shouldBalance.store(true); + } + + stdx::lock_guard<stdx::mutex> lk(_balancerSettingsMutex); + _balancerSettings = std::move(balancerSettings); + + if (_balancerSettings.isKeySet()) { + _secondaryThrottle = + uassertStatusOK(MigrationSecondaryThrottleOptions::createFromBalancerConfig( + _balancerSettings.toBSON())); + } else { + _secondaryThrottle = + MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault); + } + + if (_balancerSettings.isWaitForDeleteSet() && _balancerSettings.getWaitForDelete()) { + _waitForDelete = true; + } else { + _waitForDelete = false; + } + + return Status::OK(); +} + +void BalancerConfiguration::_useDefaultBalancerSettings() { + _shouldBalance.store(true); + _balancerSettings = SettingsType{}; + _waitForDelete = false; + _secondaryThrottle = + MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault); +} + +Status BalancerConfiguration::_refreshChunkSizeSettings(OperationContext* txn) { + SettingsType chunkSizeSettings; + + auto chunkSizeSettingsStatus = + grid.catalogManager(txn)->getGlobalSettings(txn, SettingsType::ChunkSizeDocKey); + if (chunkSizeSettingsStatus.isOK()) { + chunkSizeSettings = std::move(chunkSizeSettingsStatus.getValue()); + } else if (chunkSizeSettingsStatus.getStatus() != ErrorCodes::NoMatchingDocument) { + return chunkSizeSettingsStatus.getStatus(); + } else { + _useDefaultChunkSizeSettings(); + return Status::OK(); + } + + const uint64_t newMaxChunkSizeBytes = chunkSizeSettings.getChunkSizeMB() * 1024 * 1024; + + if (!checkMaxChunkSizeValid(newMaxChunkSizeBytes)) { + return {ErrorCodes::BadValue, + str::stream() << chunkSizeSettings.getChunkSizeMB() + << " is not a valid value for MaxChunkSize"}; + } + + if (newMaxChunkSizeBytes != getMaxChunkSizeBytes()) { + log() << "MaxChunkSize changing from " << getMaxChunkSizeBytes() / (1024 * 1024) << "MB" + << " to " << newMaxChunkSizeBytes / (1024 * 1024) << "MB"; + } + + return Status::OK(); +} + +void BalancerConfiguration::_useDefaultChunkSizeSettings() { + _maxChunkSizeBytes.store(_defaultMaxChunkSizeBytes); +} + +} // namespace mongo diff --git a/src/mongo/s/balancer/balancer_configuration.h b/src/mongo/s/balancer/balancer_configuration.h new file mode 100644 index 00000000000..b3ac83c0ff4 --- /dev/null +++ b/src/mongo/s/balancer/balancer_configuration.h @@ -0,0 +1,144 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <cstdint> + +#include "mongo/base/disallow_copying.h" +#include "mongo/s/catalog/type_settings.h" +#include "mongo/s/migration_secondary_throttle_options.h" +#include "mongo/stdx/mutex.h" +#include "mongo/platform/atomic_word.h" + +namespace mongo { + +class MigrationSecondaryThrottleOptions; +class OperationContext; +class Status; + +/** + * Contains settings, which control the behaviour of the balancer. + */ +class BalancerConfiguration { + MONGO_DISALLOW_COPYING(BalancerConfiguration); + +public: + // Default value used for the max chunk size if one is not specified in the balancer + // configuration + static const uint64_t kDefaultMaxChunkSizeBytes{64 * 1024 * 1024}; + + /** + * Primes the balancer configuration with some default values. These settings may change at a + * later time after a call to refresh(). + */ + BalancerConfiguration(uint64_t defaultMaxChunkSizeBytes); + ~BalancerConfiguration(); + + /** + * Returns whether balancing is allowed based on both the enabled state of the balancer and the + * balancing window. + */ + bool isBalancerActive() const; + + /** + * Returns the secondary throttle options for the balancer. + */ + MigrationSecondaryThrottleOptions getSecondaryThrottle() const; + + /** + * Returns whether the balancer should wait for deletion of orphaned chunk data at the end of + * each migration. + */ + bool waitForDelete() const; + + /** + * Returns the max chunk size after which a chunk would be considered jumbo. + */ + uint64_t getMaxChunkSizeBytes() const { + return _maxChunkSizeBytes.loadRelaxed(); + } + + /** + * Blocking method, which refreshes the balancer configuration from the settings in the + * config.settings collection. It will stop at the first bad configuration value and return an + * error indicating what failed. + * + * This method is thread-safe but it doesn't make sense to be called from more than one thread + * at a time. + */ + Status refreshAndCheck(OperationContext* txn); + + /** + * Validates that the specified max chunk size value (in bytes) is allowed. + */ + static bool checkMaxChunkSizeValid(uint64_t maxChunkSizeBytes); + +private: + /** + * Reloads the balancer configuration from the settings document. Fails if the settings document + * cannot be read, in which case the values will remain unchanged. + */ + Status _refreshBalancerSettings(OperationContext* txn); + + /** + * If the balancer settings document is missing, these are the defaults, which will be used. + */ + void _useDefaultBalancerSettings(); + + /** + * Reloads the chunk sizes configuration from the settings document. Fails if the settings + * document cannot be read or if any setting contains invalid value, in which case the offending + * value will remain unchanged. + */ + Status _refreshChunkSizeSettings(OperationContext* txn); + + /** + * If the chunk size settings document is missing, these are the defaults, which will be used. + */ + void _useDefaultChunkSizeSettings(); + + // Whether auto-balancing of chunks should happen + AtomicBool _shouldBalance{true}; + + // The latest read balancer settings (used for the balancer window and secondary throttle) and a + // mutex to protect its changes + mutable stdx::mutex _balancerSettingsMutex; + SettingsType _balancerSettings; + bool _waitForDelete{false}; + MigrationSecondaryThrottleOptions _secondaryThrottle; + + // Default value for use for the max chunk size if the setting is not present in the balancer + // configuration + const uint64_t _defaultMaxChunkSizeBytes; + + // Max chunk size after which a chunk would be considered jumbo and won't be moved + AtomicUInt64 _maxChunkSizeBytes; +}; + +} // namespace mongo diff --git a/src/mongo/s/catalog/dist_lock_catalog.h b/src/mongo/s/catalog/dist_lock_catalog.h index cbb59f75f94..e6537546398 100644 --- a/src/mongo/s/catalog/dist_lock_catalog.h +++ b/src/mongo/s/catalog/dist_lock_catalog.h @@ -163,4 +163,5 @@ public: */ virtual Status stopPing(OperationContext* txn, StringData processId) = 0; }; -} + +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp index d3e85b9d5c2..57db37a563f 100644 --- a/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp +++ b/src/mongo/s/catalog/replset/catalog_manager_replica_set_shard_collection_test.cpp @@ -41,6 +41,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set_test_fixture.h" @@ -51,7 +52,6 @@ #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_factory_mock.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/chunk.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -686,7 +686,8 @@ TEST_F(ShardCollectionTest, withInitialData) { ASSERT_EQUALS(keyPattern.toBSON(), request.cmdObj["keyPattern"].Obj()); ASSERT_EQUALS(keyPattern.getKeyPattern().globalMin(), request.cmdObj["min"].Obj()); ASSERT_EQUALS(keyPattern.getKeyPattern().globalMax(), request.cmdObj["max"].Obj()); - ASSERT_EQUALS(Chunk::MaxChunkSize, request.cmdObj["maxChunkSizeBytes"].numberLong()); + ASSERT_EQUALS(BalancerConfiguration::kDefaultMaxChunkSizeBytes, + static_cast<uint64_t>(request.cmdObj["maxChunkSizeBytes"].numberLong())); ASSERT_EQUALS(0, request.cmdObj["maxSplitPoints"].numberLong()); ASSERT_EQUALS(0, request.cmdObj["maxChunkObjects"].numberLong()); diff --git a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp index f6f2a517486..715cd092d23 100644 --- a/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp +++ b/src/mongo/s/catalog/replset/dist_lock_catalog_impl_test.cpp @@ -42,6 +42,7 @@ #include "mongo/executor/network_interface_mock.h" #include "mongo/executor/network_test_env.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager_mock.h" #include "mongo/s/catalog/replset/dist_lock_catalog_impl.h" @@ -137,6 +138,7 @@ private: stdx::make_unique<CatalogCache>(), std::move(shardRegistry), std::unique_ptr<ClusterCursorManager>{nullptr}, + std::unique_ptr<BalancerConfiguration>{nullptr}, std::move(executorPool), network); diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index cd3b226db65..5397cb8daa2 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -43,10 +43,11 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balance.h" #include "mongo/s/balancer_policy.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/balancer/cluster_statistics.h" #include "mongo/s/catalog/catalog_manager.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog/type_settings.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_connection.h" @@ -69,7 +70,7 @@ using std::vector; namespace { -const int kTooManySplitPoints = 4; +const uint64_t kTooManySplitPoints = 4; /** * Attempts to move the given chunk to another shard. @@ -144,7 +145,7 @@ bool tryMoveToOtherShard(OperationContext* txn, if (!toMove->moveAndCommit( txn, newShard->getId(), - Chunk::MaxChunkSize, + Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff), false, /* waitForDelete - small chunk, no need */ 0, /* maxTimeMS - don't time out */ @@ -160,17 +161,11 @@ bool tryMoveToOtherShard(OperationContext* txn, } // namespace -long long Chunk::MaxChunkSize = 1024 * 1024 * 64; - -// Can be overridden from command line -bool Chunk::ShouldAutoSplit = true; - -Chunk::Chunk(OperationContext* txn, const ChunkManager* manager, const ChunkType& from) - : _manager(manager), _lastmod(0, 0, OID()), _dataWritten(mkDataWritten()) { +Chunk::Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from) + : _manager(manager), _lastmod(from.getVersion()), _dataWritten(mkDataWritten()) { string ns = from.getNS(); _shardId = from.getShard(); - _lastmod = from.getVersion(); verify(_lastmod.isSet()); _min = from.getMin().getOwned(); @@ -185,22 +180,24 @@ Chunk::Chunk(OperationContext* txn, const ChunkManager* manager, const ChunkType uassert(10171, "Chunk needs a server", grid.shardRegistry()->getShard(txn, _shardId)); } -Chunk::Chunk(const ChunkManager* info, +Chunk::Chunk(ChunkManager* info, const BSONObj& min, const BSONObj& max, const ShardId& shardId, - ChunkVersion lastmod) + ChunkVersion lastmod, + uint64_t initialDataWritten) : _manager(info), _min(min), _max(max), _shardId(shardId), _lastmod(lastmod), _jumbo(false), - _dataWritten(mkDataWritten()) {} + _dataWritten(initialDataWritten) {} int Chunk::mkDataWritten() { PseudoRandom r(static_cast<int64_t>(time(0))); - return r.nextInt32(MaxChunkSize / ChunkManager::SplitHeuristics::splitTestFactor); + return r.nextInt32(grid.getBalancerConfiguration()->getMaxChunkSizeBytes() / + ChunkManager::SplitHeuristics::splitTestFactor); } bool Chunk::containsKey(const BSONObj& shardKey) const { @@ -212,41 +209,6 @@ bool ChunkRange::containsKey(const BSONObj& shardKey) const { return getMin().woCompare(shardKey) <= 0 && shardKey.woCompare(getMax()) < 0; } -bool Chunk::shouldBalance(const SettingsType& balancerSettings) { - if (balancerSettings.isBalancerStoppedSet() && balancerSettings.getBalancerStopped()) { - return false; - } - - if (balancerSettings.isBalancerActiveWindowSet()) { - boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); - return balancerSettings.inBalancingWindow(now); - } - - return true; -} - -bool Chunk::getConfigShouldBalance(OperationContext* txn) const { - auto balSettingsResult = - grid.catalogManager(txn)->getGlobalSettings(txn, SettingsType::BalancerDocKey); - if (!balSettingsResult.isOK()) { - if (balSettingsResult == ErrorCodes::NoMatchingDocument) { - // Settings document for balancer does not exist, default to balancing allowed. - return true; - } - - warning() << balSettingsResult.getStatus(); - return false; - } - SettingsType balSettings = balSettingsResult.getValue(); - - if (!balSettings.isKeySet()) { - // Balancer settings doc does not exist. Default to yes. - return true; - } - - return shouldBalance(balSettings); -} - bool Chunk::_minIsInf() const { return 0 == _manager->getShardKeyPattern().getKeyPattern().globalMin().woCompare(getMin()); } @@ -325,16 +287,17 @@ std::vector<BSONObj> Chunk::_determineSplitPoints(OperationContext* txn, bool at splitPoints.push_back(medianKey); } } else { - long long chunkSize = _manager->getCurrentDesiredChunkSize(); + uint64_t chunkSize = _manager->getCurrentDesiredChunkSize(); // Note: One split point for every 1/2 chunk size. - const int estNumSplitPoints = _dataWritten / chunkSize * 2; + const uint64_t estNumSplitPoints = _dataWritten / chunkSize * 2; if (estNumSplitPoints >= kTooManySplitPoints) { // The current desired chunk size will split the chunk into lots of small chunk and at // the worst case this can result into thousands of chunks. So check and see if a bigger // value can be used. - chunkSize = std::min(_dataWritten, Chunk::MaxChunkSize); + chunkSize = std::min( + _dataWritten, Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes()); } splitPoints = @@ -507,19 +470,19 @@ bool Chunk::moveAndCommit(OperationContext* txn, return worked; } -bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) const { - dassert(ShouldAutoSplit); +bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) { LastError::Disabled d(&LastError::get(cc())); try { _dataWritten += dataWritten; - int splitThreshold = getManager()->getCurrentDesiredChunkSize(); + uint64_t splitThreshold = getManager()->getCurrentDesiredChunkSize(); if (_minIsInf() || _maxIsInf()) { - splitThreshold = (int)((double)splitThreshold * .9); + splitThreshold = static_cast<uint64_t>((double)splitThreshold * 0.9); } - if (_dataWritten < splitThreshold / ChunkManager::SplitHeuristics::splitTestFactor) + if (_dataWritten < splitThreshold / ChunkManager::SplitHeuristics::splitTestFactor) { return false; + } if (!getManager()->_splitHeuristics._splitTickets.tryAcquire()) { LOG(1) << "won't auto split because not enough tickets: " << getManager()->getns(); @@ -556,16 +519,23 @@ bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) const { _dataWritten = 0; } - bool shouldBalance = getConfigShouldBalance(txn); + Status refreshStatus = Grid::get(txn)->getBalancerConfiguration()->refreshAndCheck(txn); + if (!refreshStatus.isOK()) { + warning() << "Unable to refresh balancer settings" << causedBy(refreshStatus); + return false; + } + + bool shouldBalance = Grid::get(txn)->getBalancerConfiguration()->isBalancerActive(); if (shouldBalance) { - auto status = grid.catalogManager(txn)->getCollection(txn, _manager->getns()); - if (!status.isOK()) { - log() << "Auto-split for " << _manager->getns() - << " failed to load collection metadata due to " << status.getStatus(); + auto collStatus = grid.catalogManager(txn)->getCollection(txn, _manager->getns()); + if (!collStatus.isOK()) { + warning() << "Auto-split for " << _manager->getns() + << " failed to load collection metadata" + << causedBy(collStatus.getStatus()); return false; } - shouldBalance = status.getValue().value.getAllowBalance(); + shouldBalance = collStatus.getValue().value.getAllowBalance(); } log() << "autosplitted " << _manager->getns() << " shard: " << toString() << " into " @@ -592,7 +562,6 @@ bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) const { } return true; - } catch (DBException& e) { // TODO: Make this better - there are lots of reasons a split could fail // Random so that we don't sync up with other failed splits @@ -647,35 +616,4 @@ void Chunk::markAsJumbo(OperationContext* txn) const { } } -void Chunk::refreshChunkSize(OperationContext* txn) { - auto chunkSizeSettingsResult = - grid.catalogManager(txn)->getGlobalSettings(txn, SettingsType::ChunkSizeDocKey); - if (!chunkSizeSettingsResult.isOK()) { - log() << chunkSizeSettingsResult.getStatus(); - return; - } - SettingsType chunkSizeSettings = chunkSizeSettingsResult.getValue(); - int csize = chunkSizeSettings.getChunkSizeMB(); - - LOG(1) << "Refreshing MaxChunkSize: " << csize << "MB"; - - if (csize != Chunk::MaxChunkSize / (1024 * 1024)) { - log() << "MaxChunkSize changing from " << Chunk::MaxChunkSize / (1024 * 1024) << "MB" - << " to " << csize << "MB"; - } - - if (!setMaxChunkSizeSizeMB(csize)) { - warning() << "invalid MaxChunkSize: " << csize; - } -} - -bool Chunk::setMaxChunkSizeSizeMB(int newMaxChunkSize) { - if (newMaxChunkSize < 1) - return false; - if (newMaxChunkSize > 1024) - return false; - MaxChunkSize = newMaxChunkSize * 1024 * 1024; - return true; -} - } // namespace mongo diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 553c0bbec9d..2f477f93a8c 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -28,14 +28,13 @@ #pragma once -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/catalog/type_settings.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard.h" namespace mongo { class ChunkManager; +class ChunkType; class MigrationSecondaryThrottleOptions; class OperationContext; @@ -64,12 +63,14 @@ public: autoSplitInternal }; - Chunk(OperationContext* txn, const ChunkManager* info, const ChunkType& from); - Chunk(const ChunkManager* info, + Chunk(OperationContext* txn, ChunkManager* manager, const ChunkType& from); + + Chunk(ChunkManager* manager, const BSONObj& min, const BSONObj& max, const ShardId& shardId, - ChunkVersion lastmod = ChunkVersion()); + ChunkVersion lastmod, + uint64_t initialDataWritten); // // chunk boundary support @@ -78,21 +79,11 @@ public: const BSONObj& getMin() const { return _min; } + const BSONObj& getMax() const { return _max; } - /** - * Returns true if the balancer should be running. Caller is responsible for making sure - * settings has the balancer key. - */ - static bool shouldBalance(const SettingsType& balancerSettings); - - /** - * Returns true if the config server settings indicate that the balancer should be active. - */ - bool getConfigShouldBalance(OperationContext* txn) const; - // Returns true if this chunk contains the given shard key, and false otherwise // // Note: this function takes an extracted *key*, not an original document @@ -109,9 +100,6 @@ public: ChunkVersion getLastmod() const { return _lastmod; } - void setLastmod(ChunkVersion v) { - _lastmod = v; - } // // split support @@ -120,18 +108,13 @@ public: long long getBytesWritten() const { return _dataWritten; } - // Const since _dataWritten is mutable and a heuristic - // TODO: Split data tracking and chunk information - void setBytesWritten(long long bytesWritten) const { - _dataWritten = bytesWritten; - } /** * if the amount of data written nears the max size of a shard * then we check the real size, and if its too big, we split * @return if something was split */ - bool splitIfShould(OperationContext* txn, long dataWritten) const; + bool splitIfShould(OperationContext* txn, long dataWritten); /** * Splits this chunk at a non-specificed split key to be chosen by the @@ -193,25 +176,11 @@ public: return _jumbo; } - /** - * Attempt to refresh maximum chunk size from config. - */ - static void refreshChunkSize(OperationContext* txn); - - /** - * sets MaxChunkSize - * 1 <= newMaxChunkSize <= 1024 - * @return true if newMaxChunkSize is valid and was set - */ - static bool setMaxChunkSizeSizeMB(int newMaxChunkSize); - // // public constants // - static long long MaxChunkSize; static const int MaxObjectPerChunk{250000}; - static bool ShouldAutoSplit; // // accessors and helpers @@ -252,12 +221,11 @@ private: BSONObj _min; BSONObj _max; ShardId _shardId; - ChunkVersion _lastmod; + const ChunkVersion _lastmod; mutable bool _jumbo; - // transient stuff - - mutable long long _dataWritten; + // Statistics for the approximate data written by this chunk + uint64_t _dataWritten; // methods, etc.. diff --git a/src/mongo/s/chunk_manager.cpp b/src/mongo/s/chunk_manager.cpp index 5728cfe07db..3fbb8057b06 100644 --- a/src/mongo/s/chunk_manager.cpp +++ b/src/mongo/s/chunk_manager.cpp @@ -45,6 +45,7 @@ #include "mongo/db/query/query_planner.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_chunk.h" @@ -244,10 +245,12 @@ bool ChunkManager::_load(OperationContext* txn, // interesting things here for (const auto& oldChunkMapEntry : oldChunkMap) { shared_ptr<Chunk> oldC = oldChunkMapEntry.second; - shared_ptr<Chunk> newC(new Chunk( - this, oldC->getMin(), oldC->getMax(), oldC->getShardId(), oldC->getLastmod())); - - newC->setBytesWritten(oldC->getBytesWritten()); + shared_ptr<Chunk> newC(new Chunk(this, + oldC->getMin(), + oldC->getMax(), + oldC->getShardId(), + oldC->getLastmod(), + oldC->getBytesWritten())); chunkMap.insert(make_pair(oldC->getMax(), newC)); } @@ -367,16 +370,16 @@ void ChunkManager::calcInitSplitsAndShards(OperationContext* txn, uassertStatusOK(bsonExtractIntegerField(result.getValue(), "n", &numObjects)); if (numObjects > 0) { - *splitPoints = uassertStatusOK( - shardutil::selectChunkSplitPoints(txn, - primaryShardId, - NamespaceString(_ns), - _keyPattern, - _keyPattern.getKeyPattern().globalMin(), - _keyPattern.getKeyPattern().globalMax(), - Chunk::MaxChunkSize, - 0, - 0)); + *splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( + txn, + primaryShardId, + NamespaceString(_ns), + _keyPattern, + _keyPattern.getKeyPattern().globalMin(), + _keyPattern.getKeyPattern().globalMax(), + Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), + 0, + 0)); } // since docs already exists, must use primary shard @@ -814,11 +817,11 @@ void ChunkRangeManager::_insertRange(ChunkMap::const_iterator begin, } } -int ChunkManager::getCurrentDesiredChunkSize() const { +uint64_t ChunkManager::getCurrentDesiredChunkSize() const { // split faster in early chunks helps spread out an initial load better - const int minChunkSize = 1 << 20; // 1 MBytes + const uint64_t minChunkSize = 1 << 20; // 1 MBytes - int splitThreshold = Chunk::MaxChunkSize; + uint64_t splitThreshold = grid.getBalancerConfiguration()->getMaxChunkSizeBytes(); int nc = numChunks(); diff --git a/src/mongo/s/chunk_manager.h b/src/mongo/s/chunk_manager.h index 28b755aa419..8531f5d6ca0 100644 --- a/src/mongo/s/chunk_manager.h +++ b/src/mongo/s/chunk_manager.h @@ -29,17 +29,21 @@ #pragma once #include <map> +#include <set> #include <string> #include <vector> #include "mongo/db/repl/optime.h" #include "mongo/s/chunk.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/concurrency/ticketholder.h" namespace mongo { class CanonicalQuery; +class Chunk; class ChunkManager; class CollectionType; struct QuerySolutionNode; @@ -90,7 +94,6 @@ private: typedef std::map<BSONObj, std::shared_ptr<ChunkRange>, BSONObjCmp> ChunkRangeMap; - class ChunkRangeManager { public: const ChunkRangeMap& ranges() const { @@ -245,7 +248,7 @@ public: void _printChunks() const; - int getCurrentDesiredChunkSize() const; + uint64_t getCurrentDesiredChunkSize() const; std::shared_ptr<ChunkManager> reload(OperationContext* txn, bool force = true) const; // doesn't modify self! diff --git a/src/mongo/s/chunk_manager_targeter.cpp b/src/mongo/s/chunk_manager_targeter.cpp index a58d762e26a..a99a233349e 100644 --- a/src/mongo/s/chunk_manager_targeter.cpp +++ b/src/mongo/s/chunk_manager_targeter.cpp @@ -34,6 +34,7 @@ #include <boost/thread/tss.hpp> +#include "mongo/s/chunk.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" @@ -45,7 +46,7 @@ namespace mongo { using std::shared_ptr; -using mongoutils::str::stream; +using str::stream; using std::map; using std::set; using std::string; @@ -512,7 +513,7 @@ Status ChunkManagerTargeter::targetShardKey(OperationContext* txn, ShardEndpoint** endpoint) const { invariant(NULL != _manager); - ChunkPtr chunk = _manager->findIntersectingChunk(txn, shardKey); + shared_ptr<Chunk> chunk = _manager->findIntersectingChunk(txn, shardKey); // Track autosplit stats for sharded collections // Note: this is only best effort accounting and is not accurate. diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index 8fe610978a8..a2400d59250 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -37,6 +37,7 @@ #include "mongo/base/status.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/chunk_manager.h" @@ -45,6 +46,7 @@ #include "mongo/s/config.h" #include "mongo/s/dbclient_shard_resolver.h" #include "mongo/s/grid.h" +#include "mongo/s/mongos_options.h" #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -110,7 +112,7 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { * Splits the chunks touched based from the targeter stats if needed. */ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const TargeterStats& stats) { - if (!Chunk::ShouldAutoSplit) { + if (!mongosGlobalParams.shouldAutoSplit) { return; } @@ -123,7 +125,7 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ shared_ptr<DBConfig> config = status.getValue(); - ChunkManagerPtr chunkManager; + shared_ptr<ChunkManager> chunkManager; shared_ptr<Shard> dummyShard; config->getChunkManagerOrPrimary(txn, nss.ns(), chunkManager, dummyShard); @@ -134,7 +136,7 @@ void splitIfNeeded(OperationContext* txn, const NamespaceString& nss, const Targ for (map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin(); it != stats.chunkSizeDelta.end(); ++it) { - ChunkPtr chunk; + shared_ptr<Chunk> chunk; try { chunk = chunkManager->findIntersectingChunk(txn, it->first); } catch (const AssertionException& ex) { diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 3ea20a66f1c..995d38a4fa7 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -26,6 +26,7 @@ env.Library( 'cluster_db_stats_cmd.cpp', 'cluster_drop_database_cmd.cpp', 'cluster_enable_sharding_cmd.cpp', + 'cluster_explain.cpp', 'cluster_explain_cmd.cpp', 'cluster_find_cmd.cpp', 'cluster_find_and_modify_cmd.cpp', @@ -61,8 +62,10 @@ env.Library( 'cluster_whats_my_uri_cmd.cpp', 'cluster_write_cmd.cpp', 'commands_public.cpp', + 'request.cpp', 'run_on_all_shards_cmd.cpp', 'sharded_command_processing.cpp', + 'strategy.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/parallel', diff --git a/src/mongo/s/commands/cluster_commands_common.h b/src/mongo/s/commands/cluster_commands_common.h index 68a2f9cdd7f..b759c14c45e 100644 --- a/src/mongo/s/commands/cluster_commands_common.h +++ b/src/mongo/s/commands/cluster_commands_common.h @@ -33,7 +33,7 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" #include "mongo/stdx/memory.h" namespace mongo { diff --git a/src/mongo/s/commands/cluster_count_cmd.cpp b/src/mongo/s/commands/cluster_count_cmd.cpp index acd426d7f2f..aafa05cd583 100644 --- a/src/mongo/s/commands/cluster_count_cmd.cpp +++ b/src/mongo/s/commands/cluster_count_cmd.cpp @@ -31,9 +31,9 @@ #include <vector> #include "mongo/db/commands.h" -#include "mongo/s/cluster_explain.h" +#include "mongo/s/commands/cluster_explain.h" #include "mongo/s/commands/cluster_commands_common.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" #include "mongo/util/timer.h" namespace mongo { diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp index d75f881c7c7..7cc9ce7e97a 100644 --- a/src/mongo/s/commands/cluster_current_op.cpp +++ b/src/mongo/s/commands/cluster_current_op.cpp @@ -39,7 +39,7 @@ #include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/s/commands/run_on_all_shards_cmd.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" diff --git a/src/mongo/s/cluster_explain.cpp b/src/mongo/s/commands/cluster_explain.cpp index c96618be641..acd915981ef 100644 --- a/src/mongo/s/cluster_explain.cpp +++ b/src/mongo/s/commands/cluster_explain.cpp @@ -32,7 +32,7 @@ #include "mongo/db/query/lite_parsed_query.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/cluster_explain.h" +#include "mongo/s/commands/cluster_explain.h" #include "mongo/s/grid.h" namespace mongo { diff --git a/src/mongo/s/cluster_explain.h b/src/mongo/s/commands/cluster_explain.h index 163388e62ff..e353407a140 100644 --- a/src/mongo/s/cluster_explain.h +++ b/src/mongo/s/commands/cluster_explain.h @@ -31,7 +31,7 @@ #include <string> #include "mongo/db/query/explain_common.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" #include "mongo/s/write_ops/batched_command_request.h" namespace mongo { diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 2400d23931a..2d34c22ddf0 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -35,17 +35,19 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/commands/find_and_modify.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/config.h" #include "mongo/s/chunk_manager.h" -#include "mongo/s/cluster_explain.h" +#include "mongo/s/commands/cluster_explain.h" #include "mongo/s/db_util.h" #include "mongo/s/grid.h" +#include "mongo/s/mongos_options.h" #include "mongo/s/stale_exception.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" #include "mongo/s/write_ops/wc_error_detail.h" #include "mongo/util/timer.h" @@ -87,7 +89,7 @@ public: BSONObjBuilder* out) const { const string ns = parseNsCollectionRequired(dbName, cmdObj); - auto status = grid.catalogCache()->getDatabase(txn, dbName); + auto status = Grid::get(txn)->catalogCache()->getDatabase(txn, dbName); uassertStatusOK(status); shared_ptr<DBConfig> conf = status.getValue(); @@ -95,7 +97,7 @@ public: shared_ptr<Shard> shard; if (!conf->isShardingEnabled() || !conf->isSharded(ns)) { - shard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); + shard = Grid::get(txn)->shardRegistry()->getShard(txn, conf->getPrimaryId()); } else { chunkMgr = _getChunkManager(txn, conf, ns); @@ -107,9 +109,9 @@ public: } BSONObj shardKey = status.getValue(); - ChunkPtr chunk = chunkMgr->findIntersectingChunk(txn, shardKey); + shared_ptr<Chunk> chunk = chunkMgr->findIntersectingChunk(txn, shardKey); - shard = grid.shardRegistry()->getShard(txn, chunk->getShardId()); + shard = Grid::get(txn)->shardRegistry()->getShard(txn, chunk->getShardId()); } BSONObjBuilder explainCmd; @@ -168,12 +170,12 @@ public: } BSONObj shardKey = status.getValue(); - ChunkPtr chunk = chunkMgr->findIntersectingChunk(txn, shardKey); + shared_ptr<Chunk> chunk = chunkMgr->findIntersectingChunk(txn, shardKey); bool ok = _runCommand(txn, conf, chunkMgr, chunk->getShardId(), ns, cmdObj, result); if (ok) { // check whether split is necessary (using update object for size heuristic) - if (Chunk::ShouldAutoSplit) { + if (mongosGlobalParams.shouldAutoSplit) { chunk->splitIfShould(txn, cmdObj.getObjectField("update").objsize()); } } @@ -219,7 +221,7 @@ private: BSONObjBuilder& result) const { BSONObj res; - const auto shard = grid.shardRegistry()->getShard(txn, shardId); + const auto shard = Grid::get(txn)->shardRegistry()->getShard(txn, shardId); ShardConnection conn(shard->getConnString(), ns, chunkManager); bool ok = conn->runCommand(conf->name(), cmdObj, res); conn.done(); diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index cc37be5606b..ef9d44860a8 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -37,7 +37,7 @@ #include "mongo/db/query/cursor_response.h" #include "mongo/db/stats/counters.h" #include "mongo/s/query/cluster_find.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" namespace mongo { namespace { diff --git a/src/mongo/s/commands/cluster_index_filter_cmd.cpp b/src/mongo/s/commands/cluster_index_filter_cmd.cpp index 5a342cba195..14e8562024c 100644 --- a/src/mongo/s/commands/cluster_index_filter_cmd.cpp +++ b/src/mongo/s/commands/cluster_index_filter_cmd.cpp @@ -32,7 +32,7 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" namespace mongo { diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index d016e7713a4..06449e4d5a7 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -39,10 +39,10 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands.h" #include "mongo/db/commands/mr.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/client/shard_connection.h" -#include "mongo/s/chunk.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/commands/sharded_command_processing.h" @@ -51,7 +51,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/db_util.h" #include "mongo/s/grid.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" #include "mongo/s/write_ops/wc_error_detail.h" #include "mongo/stdx/chrono.h" #include "mongo/util/log.h" @@ -166,7 +166,6 @@ public: return false; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return mr::mrSupportsWriteConcern(cmd); } @@ -262,7 +261,8 @@ public: // Will need to figure out chunks, ask shards for points maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong(); if (maxChunkSizeBytes == 0) { - maxChunkSizeBytes = Chunk::MaxChunkSize; + maxChunkSizeBytes = + Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(); } // maxChunkSizeBytes is sent as int BSON field @@ -549,8 +549,7 @@ public: } // Do the splitting round - ChunkManagerPtr cm = confOut->getChunkManagerIfExists(txn, outputCollNss.ns()); - + shared_ptr<ChunkManager> cm = confOut->getChunkManagerIfExists(txn, outputCollNss.ns()); uassert(34359, str::stream() << "Failed to write mapreduce output to " << outputCollNss.ns() << "; expected that collection to be sharded, but it was not", @@ -562,7 +561,7 @@ public: invariant(size < std::numeric_limits<int>::max()); // key reported should be the chunk's minimum - ChunkPtr c = cm->findIntersectingChunk(txn, key); + shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, key); if (!c) { warning() << "Mongod reported " << size << " bytes inserted for key " << key << " but can't find chunk"; diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index eebfa60b33f..430320c26d1 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -38,6 +38,7 @@ #include "mongo/db/client_basic.h" #include "mongo/db/commands.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" @@ -68,7 +69,6 @@ public: return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } @@ -152,7 +152,7 @@ public: // so far, chunk size serves test purposes; it may or may not become a supported parameter long long maxChunkSizeBytes = cmdObj["maxChunkSizeBytes"].numberLong(); if (maxChunkSizeBytes == 0) { - maxChunkSizeBytes = Chunk::MaxChunkSize; + maxChunkSizeBytes = Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(); } BSONObj find = cmdObj.getObjectField("find"); diff --git a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp index fe9bbad0b14..2ae15e1610f 100644 --- a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp +++ b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp @@ -34,7 +34,7 @@ #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" namespace mongo { diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index b6b99fd9fe4..12d20ce8860 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -44,6 +44,7 @@ #include "mongo/db/commands.h" #include "mongo/db/hasher.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/chunk_manager.h" @@ -76,7 +77,6 @@ public: return true; } - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } @@ -448,14 +448,15 @@ public: BSONObj moveResult; WriteConcernOptions noThrottle; - if (!chunk->moveAndCommit(txn, - to->getId(), - Chunk::MaxChunkSize, - MigrationSecondaryThrottleOptions::create( - MigrationSecondaryThrottleOptions::kOff), - true, - 0, - moveResult)) { + if (!chunk->moveAndCommit( + txn, + to->getId(), + Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), + MigrationSecondaryThrottleOptions::create( + MigrationSecondaryThrottleOptions::kOff), + true, + 0, + moveResult)) { warning() << "couldn't move chunk " << chunk->toString() << " to shard " << *to << " while sharding collection " << ns << "." << " Reason: " << moveResult; diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index afc5b126013..470c5c2d789 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -40,9 +40,9 @@ #include "mongo/s/chunk_manager_targeter.h" #include "mongo/s/client/dbclient_multi_command.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/cluster_explain.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/cluster_write.h" +#include "mongo/s/commands/cluster_explain.h" #include "mongo/s/dbclient_shard_resolver.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batch_upconvert.h" diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 1af094c366f..8e248b45d49 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -49,9 +49,9 @@ #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/cluster_explain.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/cluster_commands_common.h" +#include "mongo/s/commands/cluster_explain.h" #include "mongo/s/commands/run_on_all_shards_cmd.h" #include "mongo/s/commands/sharded_command_processing.h" #include "mongo/s/config.h" diff --git a/src/mongo/s/request.cpp b/src/mongo/s/commands/request.cpp index a396b0d7b86..dff8a8472b6 100644 --- a/src/mongo/s/request.cpp +++ b/src/mongo/s/commands/request.cpp @@ -32,7 +32,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/request.h" +#include "mongo/s/commands/request.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" @@ -41,7 +41,7 @@ #include "mongo/db/stats/counters.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/grid.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" #include "mongo/util/log.h" #include "mongo/util/timer.h" diff --git a/src/mongo/s/request.h b/src/mongo/s/commands/request.h index 5664474e845..5664474e845 100644 --- a/src/mongo/s/request.h +++ b/src/mongo/s/commands/request.h diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 465bbe10b61..bcece3548c0 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/strategy.h" +#include "mongo/s/commands/strategy.h" #include "mongo/base/data_cursor.h" #include "mongo/base/owned_pointer_vector.h" @@ -58,14 +58,14 @@ #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/version_manager.h" -#include "mongo/s/cluster_explain.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/commands/cluster_explain.h" +#include "mongo/s/commands/request.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_find.h" -#include "mongo/s/request.h" #include "mongo/s/stale_exception.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batch_upconvert.h" @@ -533,4 +533,5 @@ Status Strategy::explainFind(OperationContext* txn, return ClusterExplain::buildExplainResult( txn, shardResults, mongosStageName, millisElapsed, out); } -} + +} // namespace mongo diff --git a/src/mongo/s/strategy.h b/src/mongo/s/commands/strategy.h index 961387c8db4..630373b71a1 100644 --- a/src/mongo/s/strategy.h +++ b/src/mongo/s/commands/strategy.h @@ -33,12 +33,13 @@ #include "mongo/db/query/explain_common.h" #include "mongo/client/connection_string.h" #include "mongo/s/client/shard.h" -#include "mongo/s/request.h" namespace mongo { class LiteParsedQuery; class OperationContext; +class QueryMessage; +class Request; namespace rpc { class ServerSelectionMetadata; diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index acb07fd1e0f..4bb86075ea3 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -37,16 +37,13 @@ #include "mongo/db/lasterror.h" #include "mongo/db/operation_context.h" #include "mongo/db/write_concern.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_database.h" -#include "mongo/s/catalog/type_lockpings.h" -#include "mongo/s/catalog/type_locks.h" -#include "mongo/s/catalog/type_settings.h" #include "mongo/s/catalog/type_shard.h" -#include "mongo/s/catalog/type_tags.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_registry.h" @@ -265,7 +262,7 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManagerIfExists(OperationContext return getChunkManager(txn, ns, shouldReload, forceReload); } catch (AssertionException& e) { warning() << "chunk manager not found for " << ns << causedBy(e); - return ChunkManagerPtr(); + return nullptr; } } @@ -275,7 +272,7 @@ std::shared_ptr<ChunkManager> DBConfig::getChunkManager(OperationContext* txn, bool forceReload) { BSONObj key; ChunkVersion oldVersion; - ChunkManagerPtr oldManager; + std::shared_ptr<ChunkManager> oldManager; const auto currentReloadIteration = _reloadCount.load(); @@ -696,87 +693,6 @@ void DBConfig::getAllShardedCollections(set<string>& namespaces) { /* --- ConfigServer ---- */ -void ConfigServer::reloadSettings(OperationContext* txn) { - auto catalogManager = grid.catalogManager(txn); - auto chunkSizeResult = catalogManager->getGlobalSettings(txn, SettingsType::ChunkSizeDocKey); - if (chunkSizeResult.isOK()) { - const int csize = chunkSizeResult.getValue().getChunkSizeMB(); - LOG(1) << "Found MaxChunkSize: " << csize; - - if (!Chunk::setMaxChunkSizeSizeMB(csize)) { - warning() << "invalid chunksize: " << csize; - } - } else if (chunkSizeResult.getStatus() == ErrorCodes::NoMatchingDocument) { - const int chunkSize = Chunk::MaxChunkSize / (1024 * 1024); - Status result = grid.catalogManager(txn)->insertConfigDocument( - txn, - SettingsType::ConfigNS, - BSON(SettingsType::key(SettingsType::ChunkSizeDocKey) - << SettingsType::chunkSizeMB(chunkSize))); - if (!result.isOK()) { - warning() << "couldn't set chunkSize on config db" << causedBy(result); - } - } else { - warning() << "couldn't load settings on config db: " << chunkSizeResult.getStatus(); - } - - // indexes - const bool unique = true; - - Status result = clusterCreateIndex( - txn, ChunkType::ConfigNS, BSON(ChunkType::ns() << 1 << ChunkType::min() << 1), unique); - if (!result.isOK()) { - warning() << "couldn't create ns_1_min_1 index on config db" << causedBy(result); - } - - result = clusterCreateIndex( - txn, - ChunkType::ConfigNS, - BSON(ChunkType::ns() << 1 << ChunkType::shard() << 1 << ChunkType::min() << 1), - unique); - if (!result.isOK()) { - warning() << "couldn't create ns_1_shard_1_min_1 index on config db" << causedBy(result); - } - - result = clusterCreateIndex(txn, - ChunkType::ConfigNS, - BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1), - unique); - if (!result.isOK()) { - warning() << "couldn't create ns_1_lastmod_1 index on config db" << causedBy(result); - } - - result = clusterCreateIndex(txn, ShardType::ConfigNS, BSON(ShardType::host() << 1), unique); - if (!result.isOK()) { - warning() << "couldn't create host_1 index on config db" << causedBy(result); - } - - result = clusterCreateIndex(txn, LocksType::ConfigNS, BSON(LocksType::lockID() << 1), !unique); - if (!result.isOK()) { - warning() << "couldn't create lock id index on config db" << causedBy(result); - } - - result = clusterCreateIndex(txn, - LocksType::ConfigNS, - BSON(LocksType::state() << 1 << LocksType::process() << 1), - !unique); - if (!result.isOK()) { - warning() << "couldn't create state and process id index on config db" << causedBy(result); - } - - result = - clusterCreateIndex(txn, LockpingsType::ConfigNS, BSON(LockpingsType::ping() << 1), !unique); - if (!result.isOK()) { - warning() << "couldn't create lockping ping time index on config db" << causedBy(result); - } - - result = clusterCreateIndex( - txn, TagsType::ConfigNS, BSON(TagsType::ns() << 1 << TagsType::min() << 1), unique); - if (!result.isOK()) { - warning() << "could not create index ns_1_min_1: " << causedBy(result); - } -} - void ConfigServer::replicaSetChangeShardRegistryUpdateHook(const string& setName, const string& newConnectionString) { // Inform the ShardRegsitry of the new connection string for the shard. diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h index 5fee9fa39d3..cd504474b63 100644 --- a/src/mongo/s/config.h +++ b/src/mongo/s/config.h @@ -236,8 +236,6 @@ protected: class ConfigServer { public: - static void reloadSettings(OperationContext* txn); - /** * For use in mongos and mongod which needs notifications about changes to shard and config * server replset membership to update the ShardRegistry. diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 3f6ff1f06bb..d004db72a05 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -34,6 +34,7 @@ #include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/client/shard_registry.h" @@ -58,12 +59,14 @@ void Grid::init(std::unique_ptr<CatalogManager> catalogManager, std::unique_ptr<CatalogCache> catalogCache, std::unique_ptr<ShardRegistry> shardRegistry, std::unique_ptr<ClusterCursorManager> cursorManager, + std::unique_ptr<BalancerConfiguration> balancerConfig, std::unique_ptr<executor::TaskExecutorPool> executorPool, executor::NetworkInterface* network) { invariant(!_catalogManager); invariant(!_catalogCache); invariant(!_shardRegistry); invariant(!_cursorManager); + invariant(!_balancerConfig); invariant(!_executorPool); invariant(!_network); @@ -71,6 +74,7 @@ void Grid::init(std::unique_ptr<CatalogManager> catalogManager, _catalogCache = std::move(catalogCache); _shardRegistry = std::move(shardRegistry); _cursorManager = std::move(cursorManager); + _balancerConfig = std::move(balancerConfig); _executorPool = std::move(executorPool); _network = network; } @@ -83,6 +87,11 @@ void Grid::setAllowLocalHost(bool allow) { _allowLocalShard = allow; } +repl::OpTime Grid::configOpTime() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _configOpTime; +} + void Grid::advanceConfigOpTime(repl::OpTime opTime) { stdx::lock_guard<stdx::mutex> lk(_mutex); if (_configOpTime < opTime) { @@ -95,8 +104,10 @@ void Grid::clearForUnitTests() { _catalogCache.reset(); _shardRegistry.reset(); _cursorManager.reset(); + _balancerConfig.reset(); _executorPool.reset(); _network = nullptr; + _configOpTime = repl::OpTime(); } diff --git a/src/mongo/s/grid.h b/src/mongo/s/grid.h index f0746b5f142..a990143a4c1 100644 --- a/src/mongo/s/grid.h +++ b/src/mongo/s/grid.h @@ -34,6 +34,7 @@ namespace mongo { +class BalancerConfiguration; class CatalogCache; class CatalogManager; class ClusterCursorManager; @@ -70,6 +71,7 @@ public: std::unique_ptr<CatalogCache> catalogCache, std::unique_ptr<ShardRegistry> shardRegistry, std::unique_ptr<ClusterCursorManager> cursorManager, + std::unique_ptr<BalancerConfiguration> balancerConfig, std::unique_ptr<executor::TaskExecutorPool> executorPool, executor::NetworkInterface* network); @@ -91,15 +93,15 @@ public: return _catalogManager.get(); } - CatalogCache* catalogCache() { + CatalogCache* catalogCache() const { return _catalogCache.get(); } - ShardRegistry* shardRegistry() { + ShardRegistry* shardRegistry() const { return _shardRegistry.get(); } - ClusterCursorManager* getCursorManager() { + ClusterCursorManager* getCursorManager() const { return _cursorManager.get(); } @@ -111,11 +113,12 @@ public: return _network; } - repl::OpTime configOpTime() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _configOpTime; + BalancerConfiguration* getBalancerConfiguration() const { + return _balancerConfig.get(); } + repl::OpTime configOpTime() const; + void advanceConfigOpTime(repl::OpTime opTime); /** @@ -134,6 +137,7 @@ private: std::unique_ptr<CatalogCache> _catalogCache; std::unique_ptr<ShardRegistry> _shardRegistry; std::unique_ptr<ClusterCursorManager> _cursorManager; + std::unique_ptr<BalancerConfiguration> _balancerConfig; // Executor pool for scheduling work and remote commands to shards and config servers. Each // contained executor has a connection hook set on it for sending/receiving sharding metadata. diff --git a/src/mongo/s/mongos_options.cpp b/src/mongo/s/mongos_options.cpp index 17e85baff74..96de91add0e 100644 --- a/src/mongo/s/mongos_options.cpp +++ b/src/mongo/s/mongos_options.cpp @@ -42,7 +42,6 @@ #include "mongo/config.h" #include "mongo/db/server_options.h" #include "mongo/db/server_options_helpers.h" -#include "mongo/s/chunk.h" #include "mongo/s/version_mongos.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -211,16 +210,18 @@ Status storeMongosOptions(const moe::Environment& params, const std::vector<std: } if (params.count("sharding.chunkSize")) { - int csize = params["sharding.chunkSize"].as<int>(); - - // validate chunksize before proceeding - if (csize == 0) { - return Status(ErrorCodes::BadValue, "error: need a non-zero chunksize"); + const int maxChunkSizeMB = params["sharding.chunkSize"].as<int>(); + if (maxChunkSizeMB <= 0) { + return Status(ErrorCodes::BadValue, "error: need a positive chunksize"); } - if (!Chunk::setMaxChunkSizeSizeMB(csize)) { + const uint64_t maxChunkSizeBytes = maxChunkSizeMB * 1024 * 1024; + + if (!BalancerConfiguration::checkMaxChunkSizeValid(maxChunkSizeBytes)) { return Status(ErrorCodes::BadValue, "MaxChunkSize invalid"); } + + mongosGlobalParams.maxChunkSizeBytes = maxChunkSizeBytes; } if (params.count("net.port")) { @@ -244,8 +245,8 @@ Status storeMongosOptions(const moe::Environment& params, const std::vector<std: } if (params.count("sharding.autoSplit")) { - Chunk::ShouldAutoSplit = params["sharding.autoSplit"].as<bool>(); - if (Chunk::ShouldAutoSplit == false) { + mongosGlobalParams.shouldAutoSplit = params["sharding.autoSplit"].as<bool>(); + if (!mongosGlobalParams.shouldAutoSplit) { warning() << "running with auto-splitting disabled"; } } diff --git a/src/mongo/s/mongos_options.h b/src/mongo/s/mongos_options.h index d9c10c87b47..e8eb1306b9c 100644 --- a/src/mongo/s/mongos_options.h +++ b/src/mongo/s/mongos_options.h @@ -31,6 +31,7 @@ #include "mongo/base/status.h" #include "mongo/client/connection_string.h" #include "mongo/db/server_options.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/util/options_parser/environment.h" #include "mongo/util/options_parser/option_section.h" @@ -44,9 +45,14 @@ class Environment; namespace moe = mongo::optionenvironment; struct MongosGlobalParams { + // The config server connection string ConnectionString configdbs; - MongosGlobalParams() = default; + // The max chunk size after which a chunk will be considered jumbo + uint64_t maxChunkSizeBytes{BalancerConfiguration::kDefaultMaxChunkSizeBytes}; + + // Whether auto-splitting is enabled + bool shouldAutoSplit{true}; }; extern MongosGlobalParams mongosGlobalParams; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 692d74e932c..5aa9ed42bda 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -47,7 +47,6 @@ #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/cluster_explain.h" #include "mongo/s/commands/cluster_commands_common.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" diff --git a/src/mongo/s/s_only.cpp b/src/mongo/s/s_only.cpp index 668124a7dd4..1cecb1aba3d 100644 --- a/src/mongo/s/s_only.cpp +++ b/src/mongo/s/s_only.cpp @@ -54,14 +54,14 @@ namespace mongo { using std::string; using std::stringstream; - bool isMongos() { return true; } -/** When this callback is run, we record a shard that we've used for useful work - * in an operation to be read later by getLastError() -*/ +/** + * When this callback is run, we record a shard that we've used for useful work in an operation to + * be read later by getLastError() + */ void usingAShardConnection(const std::string& addr) { ClusterLastErrorInfo::get(cc()).addShardHost(addr); } diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 4b48cc4eaa4..6bb4a34b759 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -60,14 +60,20 @@ #include "mongo/platform/process_id.h" #include "mongo/s/balance.h" #include "mongo/s/catalog/catalog_manager.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_locks.h" +#include "mongo/s/catalog/type_lockpings.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog/type_tags.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_connection_hook_for_mongos.h" +#include "mongo/s/cluster_write.h" +#include "mongo/s/commands/request.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/mongos_options.h" #include "mongo/s/query/cluster_cursor_cleanup_job.h" -#include "mongo/s/request.h" #include "mongo/s/sharding_initialization.h" #include "mongo/s/version_mongos.h" #include "mongo/s/query/cluster_cursor_manager.h" @@ -196,15 +202,76 @@ public: DBClientBase* createDirectClient(OperationContext* txn) { uassert(10197, "createDirectClient not implemented for sharding yet", 0); - return 0; + return nullptr; } } // namespace mongo using namespace mongo; +static void reloadSettings(OperationContext* txn) { + Grid::get(txn)->getBalancerConfiguration()->refreshAndCheck(txn); + + // Create the config data indexes + const bool unique = true; + + Status result = clusterCreateIndex( + txn, ChunkType::ConfigNS, BSON(ChunkType::ns() << 1 << ChunkType::min() << 1), unique); + if (!result.isOK()) { + warning() << "couldn't create ns_1_min_1 index on config db" << causedBy(result); + } + + result = clusterCreateIndex( + txn, + ChunkType::ConfigNS, + BSON(ChunkType::ns() << 1 << ChunkType::shard() << 1 << ChunkType::min() << 1), + unique); + if (!result.isOK()) { + warning() << "couldn't create ns_1_shard_1_min_1 index on config db" << causedBy(result); + } + + result = clusterCreateIndex(txn, + ChunkType::ConfigNS, + BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1), + unique); + if (!result.isOK()) { + warning() << "couldn't create ns_1_lastmod_1 index on config db" << causedBy(result); + } + + result = clusterCreateIndex(txn, ShardType::ConfigNS, BSON(ShardType::host() << 1), unique); + if (!result.isOK()) { + warning() << "couldn't create host_1 index on config db" << causedBy(result); + } + + result = clusterCreateIndex(txn, LocksType::ConfigNS, BSON(LocksType::lockID() << 1), !unique); + if (!result.isOK()) { + warning() << "couldn't create lock id index on config db" << causedBy(result); + } + + result = clusterCreateIndex(txn, + LocksType::ConfigNS, + BSON(LocksType::state() << 1 << LocksType::process() << 1), + !unique); + if (!result.isOK()) { + warning() << "couldn't create state and process id index on config db" << causedBy(result); + } + + result = + clusterCreateIndex(txn, LockpingsType::ConfigNS, BSON(LockpingsType::ping() << 1), !unique); + if (!result.isOK()) { + warning() << "couldn't create lockping ping time index on config db" << causedBy(result); + } + + result = clusterCreateIndex( + txn, TagsType::ConfigNS, BSON(TagsType::ns() << 1 << TagsType::min() << 1), unique); + if (!result.isOK()) { + warning() << "could not create index ns_1_min_1: " << causedBy(result); + } +} + static Status initializeSharding(OperationContext* txn) { - Status status = initializeGlobalShardingStateForMongos(txn, mongosGlobalParams.configdbs); + Status status = initializeGlobalShardingStateForMongos( + txn, mongosGlobalParams.configdbs, mongosGlobalParams.maxChunkSizeBytes); if (!status.isOK()) { return status; } @@ -265,7 +332,7 @@ static ExitCode runMongosServer() { return EXIT_SHARDING_ERROR; } - ConfigServer::reloadSettings(opCtx.get()); + reloadSettings(opCtx.get()); } #if !defined(_WIN32) @@ -407,7 +474,6 @@ MONGO_INITIALIZER_GENERAL(setSSLManagerType, return Status::OK(); } #endif -} // namespace int mongoSMain(int argc, char* argv[], char** envp) { static StaticObserver staticObserver; @@ -445,6 +511,8 @@ int mongoSMain(int argc, char* argv[], char** envp) { return 20; } +} // namespace + #if defined(_WIN32) // In Windows, wmain() is an alternate entry point for main(), and receives the same parameters // as main() but encoded in Windows Unicode (UTF-16); "wide" 16-bit wchar_t characters. The diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index f4df809a705..232996ddd2c 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -29,7 +29,6 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" -#include "mongo/platform/random.h" #include "mongo/s/sharding_initialization.h" @@ -47,6 +46,7 @@ #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/metadata_hook.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/sharding_network_connection_hook.h" #include "mongo/s/grid.h" @@ -133,6 +133,7 @@ std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(std::unique_ptr<NetworkIn Status initializeGlobalShardingState(OperationContext* txn, const ConnectionString& configCS, + uint64_t maxChunkSizeBytes, bool isMongos) { if (configCS.type() == ConnectionString::INVALID) { return {ErrorCodes::BadValue, "Unrecognized connection string."}; @@ -165,6 +166,7 @@ Status initializeGlobalShardingState(OperationContext* txn, stdx::make_unique<CatalogCache>(), std::move(shardRegistry), stdx::make_unique<ClusterCursorManager>(getGlobalServiceContext()->getPreciseClockSource()), + stdx::make_unique<BalancerConfiguration>(maxChunkSizeBytes), std::move(executorPool), networkPtr); @@ -199,13 +201,15 @@ Status initializeGlobalShardingState(OperationContext* txn, } // namespace Status initializeGlobalShardingStateForMongos(OperationContext* txn, - const ConnectionString& configCS) { - return initializeGlobalShardingState(txn, configCS, true); + const ConnectionString& configCS, + uint64_t maxChunkSizeBytes) { + return initializeGlobalShardingState(txn, configCS, maxChunkSizeBytes, true); } Status initializeGlobalShardingStateForMongod(OperationContext* txn, const ConnectionString& configCS) { - return initializeGlobalShardingState(txn, configCS, false); + return initializeGlobalShardingState( + txn, configCS, BalancerConfiguration::kDefaultMaxChunkSizeBytes, false); } } // namespace mongo diff --git a/src/mongo/s/sharding_initialization.h b/src/mongo/s/sharding_initialization.h index 9e4e1de346d..eeb1763df70 100644 --- a/src/mongo/s/sharding_initialization.h +++ b/src/mongo/s/sharding_initialization.h @@ -28,6 +28,8 @@ #pragma once +#include <cstdint> + namespace mongo { class ConnectionString; @@ -39,7 +41,8 @@ class Status; * CatalogManager, ShardingRegistry, and grid objects. */ Status initializeGlobalShardingStateForMongos(OperationContext* txn, - const ConnectionString& configCS); + const ConnectionString& configCS, + uint64_t maxChunkSizeBytes); Status initializeGlobalShardingStateForMongod(OperationContext* txn, const ConnectionString& configCS); diff --git a/src/mongo/s/sharding_test_fixture.cpp b/src/mongo/s/sharding_test_fixture.cpp index 8584741c0c5..2c937d47761 100644 --- a/src/mongo/s/sharding_test_fixture.cpp +++ b/src/mongo/s/sharding_test_fixture.cpp @@ -45,6 +45,7 @@ #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" +#include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/dist_lock_manager_mock.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set.h" @@ -132,14 +133,16 @@ void ShardingTestFixture::setUp() { auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS)); executorPool->startup(); - // For now initialize the global grid object. All sharding objects will be accessible - // from there until we get rid of it. - grid.init(std::move(cm), - stdx::make_unique<CatalogCache>(), - std::move(shardRegistry), - stdx::make_unique<ClusterCursorManager>(_service->getPreciseClockSource()), - std::move(executorPool), - _mockNetwork); + // For now initialize the global grid object. All sharding objects will be accessible from there + // until we get rid of it. + grid.init( + std::move(cm), + stdx::make_unique<CatalogCache>(), + std::move(shardRegistry), + stdx::make_unique<ClusterCursorManager>(_service->getPreciseClockSource()), + stdx::make_unique<BalancerConfiguration>(BalancerConfiguration::kDefaultMaxChunkSizeBytes), + std::move(executorPool), + _mockNetwork); } void ShardingTestFixture::tearDown() { |