diff options
-rw-r--r-- | src/mongo/s/balancer/balancer.cpp | 166 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer.h | 23 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer_chunk_selection_policy.h | 9 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp | 32 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer_policy.cpp | 36 | ||||
-rw-r--r-- | src/mongo/s/balancer/balancer_policy.h | 12 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 62 | ||||
-rw-r--r-- | src/mongo/s/chunk.h | 24 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_move_chunk_cmd.cpp | 44 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_shard_collection_cmd.cpp | 118 |
11 files changed, 292 insertions, 238 deletions
diff --git a/src/mongo/s/balancer/balancer.cpp b/src/mongo/s/balancer/balancer.cpp index e71c0ac0e21..95215846a5f 100644 --- a/src/mongo/s/balancer/balancer.cpp +++ b/src/mongo/s/balancer/balancer.cpp @@ -33,11 +33,13 @@ #include "mongo/s/balancer/balancer.h" #include "mongo/base/status_with.h" +#include "mongo/bson/util/bson_extract.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/client.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/lite_parsed_query.h" #include "mongo/db/server_options.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balancer/balancer_chunk_selection_policy_impl.h" @@ -52,6 +54,7 @@ #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/move_chunk_request.h" #include "mongo/s/shard_util.h" #include "mongo/s/sharding_raii.h" #include "mongo/stdx/memory.h" @@ -75,6 +78,8 @@ namespace { const Seconds kBalanceRoundDefaultInterval(10); const Seconds kShortBalanceRoundInterval(1); +const char kChunkTooBig[] = "chunkTooBig"; + const auto getBalancer = ServiceContext::declareDecoration<Balancer>(); /** @@ -148,6 +153,82 @@ void warnOnMultiVersion(const vector<ClusterStatistics::ShardStatistics>& cluste warning() << sb.str(); } +/** + * Blocking method, which requests a single chunk migration to run. + */ +Status executeSingleMigration(OperationContext* txn, + const MigrateInfo& migrateInfo, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + const NamespaceString nss(migrateInfo.ns); + + auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss); + if (!scopedCMStatus.isOK()) { + return scopedCMStatus.getStatus(); + } + + ChunkManager* const cm = scopedCMStatus.getValue().cm(); + + shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey); + + BSONObjBuilder builder; + MoveChunkRequest::appendAsCommand( + &builder, + nss, + cm->getVersion(), + Grid::get(txn)->shardRegistry()->getConfigServerConnectionString(), + migrateInfo.from, + migrateInfo.to, + c->getMin(), + c->getMax(), + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete); + builder.append(LiteParsedQuery::cmdOptionMaxTimeMS, + durationCount<Milliseconds>(Microseconds(txn->getRemainingMaxTimeMicros()))); + + BSONObj cmdObj = builder.obj(); + + Status status{ErrorCodes::NotYetInitialized, "Uninitialized"}; + + auto shard = Grid::get(txn)->shardRegistry()->getShard(txn, migrateInfo.from); + if (!shard) { + status = {ErrorCodes::ShardNotFound, + str::stream() << "shard " << migrateInfo.from << " not found"}; + } else { + auto cmdStatus = shard->runCommand(txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + cmdObj, + Shard::RetryPolicy::kNotIdempotent); + if (!cmdStatus.isOK()) { + status = std::move(cmdStatus.getStatus()); + } else { + status = std::move(cmdStatus.getValue().commandStatus); + BSONObj cmdResponse = std::move(cmdStatus.getValue().response); + + // For backwards compatibility with 3.2 and earlier, where the move chunk command + // instead of returning a ChunkTooBig status includes an extra field in the response + bool chunkTooBig = false; + bsonExtractBooleanFieldWithDefault(cmdResponse, kChunkTooBig, false, &chunkTooBig); + if (chunkTooBig) { + invariant(!status.isOK()); + status = {ErrorCodes::ChunkTooBig, status.reason()}; + } + } + } + + if (!status.isOK()) { + log() << "Move chunk " << cmdObj << " failed" << causedBy(status); + return {status.code(), str::stream() << "move failed due to " << status.toString()}; + } + + cm->reload(txn); + + return Status::OK(); +} + MONGO_FP_DECLARE(skipBalanceRound); MONGO_FP_DECLARE(balancerRoundIntervalSetting); @@ -183,15 +264,34 @@ Status Balancer::rebalanceSingleChunk(OperationContext* txn, const ChunkType& ch return refreshStatus; } - try { - _moveChunks(txn, - {*migrateInfo}, - balancerConfig->getSecondaryThrottle(), - balancerConfig->waitForDelete()); - return Status::OK(); - } catch (const DBException& e) { - return e.toStatus(); + _moveChunks(txn, + {*migrateInfo}, + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete()); + + return Status::OK(); +} + +Status Balancer::moveSingleChunk(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete) { + auto moveAllowedStatus = _chunkSelectionPolicy->checkMoveAllowed(txn, chunk, newShardId); + if (!moveAllowedStatus.isOK()) { + return moveAllowedStatus; } + + return executeSingleMigration(txn, + MigrateInfo(chunk.getNS(), newShardId, chunk), + maxChunkSizeBytes, + secondaryThrottle, + waitForDelete); +} + +std::string Balancer::name() const { + return "Balancer"; } void Balancer::run() { @@ -468,8 +568,10 @@ int Balancer::_moveChunks(OperationContext* txn, int movedCount = 0; for (const auto& migrateInfo : candidateChunks) { + auto balancerConfig = Grid::get(txn)->getBalancerConfiguration(); + // If the balancer was disabled since we started this round, don't start new chunk moves - if (!Grid::get(txn)->getBalancerConfiguration()->isBalancerActive()) { + if (!balancerConfig->isBalancerActive()) { LOG(1) << "Stopping balancing round early as balancing was disabled"; return movedCount; } @@ -487,47 +589,33 @@ int Balancer::_moveChunks(OperationContext* txn, const NamespaceString nss(migrateInfo.ns); try { - auto scopedCM = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss)); - ChunkManager* const cm = scopedCM.cm(); - - shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey); - - if (c->getMin().woCompare(migrateInfo.minKey) || - c->getMax().woCompare(migrateInfo.maxKey)) { - log() << "Migration " << migrateInfo - << " will be skipped this round due to chunk metadata mismatch."; - scopedCM.db()->getChunkManager(txn, nss.ns(), true); - continue; - } - - BSONObj res; - if (c->moveAndCommit(txn, - migrateInfo.to, - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - secondaryThrottle, - waitForDelete, - 0, /* maxTimeMS */ - res)) { + Status status = executeSingleMigration(txn, + migrateInfo, + balancerConfig->getMaxChunkSizeBytes(), + balancerConfig->getSecondaryThrottle(), + balancerConfig->waitForDelete()); + if (status.isOK()) { movedCount++; - continue; - } + } else if (status == ErrorCodes::ChunkTooBig) { + log() << "Performing a split because migrate failed for size reasons" + << causedBy(status); - log() << "balancer move failed: " << res << ", migrate: " << migrateInfo; + auto scopedCM = uassertStatusOK(ScopedChunkManager::getExisting(txn, nss)); + ChunkManager* const cm = scopedCM.cm(); - Status moveStatus = getStatusFromCommandResult(res); + shared_ptr<Chunk> c = cm->findIntersectingChunk(txn, migrateInfo.minKey); - if (moveStatus == ErrorCodes::ChunkTooBig || res["chunkTooBig"].trueValue()) { - log() << "Performing a split because migrate failed for size reasons"; - - auto splitStatus = c->split(txn, Chunk::normal, NULL); + auto splitStatus = c->split(txn, Chunk::normal, nullptr); if (!splitStatus.isOK()) { - log() << "marking chunk as jumbo: " << c->toString(); + log() << "Marking chunk " << c->toString() << " as jumbo."; c->markAsJumbo(txn); // We increment moveCount so we do another round right away movedCount++; } + } else { + log() << "Balancer move failed" << causedBy(status); } } catch (const DBException& ex) { log() << "balancer move " << migrateInfo << " failed" << causedBy(ex); diff --git a/src/mongo/s/balancer/balancer.h b/src/mongo/s/balancer/balancer.h index 63159beceee..ccac10d92db 100644 --- a/src/mongo/s/balancer/balancer.h +++ b/src/mongo/s/balancer/balancer.h @@ -72,14 +72,25 @@ public: */ Status rebalanceSingleChunk(OperationContext* txn, const ChunkType& chunk); -private: - // BackgroundJob methods implementation + /** + * Blocking call, which requests the balancer to move a single chunk to the specified location + * in accordance with the active balancer policy. An error will be returned if the attempt to + * move fails for any reason. + * + * NOTE: This call disregards the balancer enabled/disabled status and will proceed with the + * move regardless. If should be used only for user-initiated moves. + */ + Status moveSingleChunk(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId, + uint64_t maxChunkSizeBytes, + const MigrationSecondaryThrottleOptions& secondaryThrottle, + bool waitForDelete); - void run() override; +private: + std::string name() const final; - std::string name() const override { - return "Balancer"; - } + void run() final; /** * Checks that the balancer can connect to all servers it needs to do its job. diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy.h b/src/mongo/s/balancer/balancer_chunk_selection_policy.h index 7bdf049c331..1329f3f9194 100644 --- a/src/mongo/s/balancer/balancer_chunk_selection_policy.h +++ b/src/mongo/s/balancer/balancer_chunk_selection_policy.h @@ -103,6 +103,15 @@ public: virtual StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove( OperationContext* txn, const ChunkType& chunk) = 0; + /** + * Asks the chunk selection policy to validate that the specified chunk migration is allowed + * given the current rules. Returns OK if the migration won't violate any rules or any other + * failed status otherwise. + */ + virtual Status checkMoveAllowed(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId) = 0; + protected: BalancerChunkSelectionPolicy(); }; diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp index 9993bd0d6f8..a685dee7317 100644 --- a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp +++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.cpp @@ -217,6 +217,38 @@ BalancerChunkSelectionPolicyImpl::selectSpecificChunkToMove(OperationContext* tx return boost::optional<MigrateInfo>{MigrateInfo(nss.ns(), newShardId, chunk)}; } +Status BalancerChunkSelectionPolicyImpl::checkMoveAllowed(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId) { + auto tagForChunkStatus = + Grid::get(txn)->catalogManager(txn)->getTagForChunk(txn, chunk.getNS(), chunk); + if (!tagForChunkStatus.isOK()) { + return tagForChunkStatus.getStatus(); + } + + auto shardStatsStatus = _clusterStats->getStats(txn); + if (!shardStatsStatus.isOK()) { + return shardStatsStatus.getStatus(); + } + + const auto& shardStats = shardStatsStatus.getValue(); + + auto newShardIterator = + std::find_if(shardStats.begin(), + shardStats.end(), + [&newShardId](const ClusterStatistics::ShardStatistics& stat) { + return stat.shardId == newShardId; + }); + if (newShardIterator == shardStats.end()) { + return {ErrorCodes::ShardNotFound, + str::stream() << "Unable to find constraints information for shard " << newShardId + << ". Move to this shard will be disallowed."}; + } + + return DistributionStatus::isShardSuitableReceiver(*newShardIterator, + tagForChunkStatus.getValue()); +} + StatusWith<SplitInfoVector> BalancerChunkSelectionPolicyImpl::_getSplitCandidatesForCollection( OperationContext* txn, const NamespaceString& nss) { auto scopedCMStatus = ScopedChunkManager::getExisting(txn, nss); diff --git a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h index ab6230243d9..0b1cb9582a6 100644 --- a/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h +++ b/src/mongo/s/balancer/balancer_chunk_selection_policy_impl.h @@ -47,6 +47,10 @@ public: StatusWith<boost::optional<MigrateInfo>> selectSpecificChunkToMove( OperationContext* txn, const ChunkType& chunk) override; + Status checkMoveAllowed(OperationContext* txn, + const ChunkType& chunk, + const ShardId& newShardId) override; + private: /** * Synchronous method, which iterates the collection's chunks and uses the tags information to diff --git a/src/mongo/s/balancer/balancer_policy.cpp b/src/mongo/s/balancer/balancer_policy.cpp index 17cd2136979..cc7b47693e5 100644 --- a/src/mongo/s/balancer/balancer_policy.cpp +++ b/src/mongo/s/balancer/balancer_policy.cpp @@ -97,23 +97,35 @@ unsigned DistributionStatus::numberOfChunksInShardWithTag(const ShardId& shardId return total; } +Status DistributionStatus::isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat, + const string& chunkTag) { + if (stat.isSizeMaxed()) { + return {ErrorCodes::IllegalOperation, + str::stream() << stat.shardId + << " has already reached the maximum total chunk size."}; + } + + if (stat.isDraining) { + return {ErrorCodes::IllegalOperation, + str::stream() << stat.shardId << " is currently draining."}; + } + + if (!chunkTag.empty() && !stat.shardTags.count(chunkTag)) { + return {ErrorCodes::IllegalOperation, + str::stream() << stat.shardId << " doesn't have right tag"}; + } + + return Status::OK(); +} + string DistributionStatus::getBestReceieverShard(const string& tag) const { string best; unsigned minChunks = numeric_limits<unsigned>::max(); for (const auto& stat : _shardInfo) { - if (stat.isSizeMaxed()) { - LOG(1) << stat.shardId << " has already reached the maximum total chunk size."; - continue; - } - - if (stat.isDraining) { - LOG(1) << stat.shardId << " is currently draining."; - continue; - } - - if (!tag.empty() && !stat.shardTags.count(tag)) { - LOG(1) << stat.shardId << " doesn't have right tag"; + auto status = isShardSuitableReceiver(stat, tag); + if (!status.isOK()) { + LOG(1) << status.codeString(); continue; } diff --git a/src/mongo/s/balancer/balancer_policy.h b/src/mongo/s/balancer/balancer_policy.h index 45fb6409294..428349098a1 100644 --- a/src/mongo/s/balancer/balancer_policy.h +++ b/src/mongo/s/balancer/balancer_policy.h @@ -78,20 +78,24 @@ class DistributionStatus { public: DistributionStatus(ShardStatisticsVector shardInfo, const ShardToChunksMap& shardToChunksMap); - // only used when building - /** * @return if range is valid */ bool addTagRange(const TagRange& range); - // ---- these methods might be better suiting in BalancerPolicy + /** + * Determines whether a shard with the specified utilization statistics would be able to accept + * a chunk with the specified tag. According to the policy a shard cannot accept chunks if its + * size is maxed out and if the chunk's tag conflicts with the tag of the shard. + */ + static Status isShardSuitableReceiver(const ClusterStatistics::ShardStatistics& stat, + const std::string& chunkTag); /** * @param forTag "" if you don't care, or a tag * @return shard best suited to receive a chunk */ - std::string getBestReceieverShard(const std::string& forTag) const; + std::string getBestReceieverShard(const std::string& tag) const; /** * @return the shard with the most chunks diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 44ebb40b5b3..b6096e3051d 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -33,11 +33,9 @@ #include "mongo/s/chunk.h" #include "mongo/client/connpool.h" -#include "mongo/client/read_preference.h" #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" #include "mongo/platform/random.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balancer/balancer.h" #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_manager.h" @@ -46,7 +44,6 @@ #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" -#include "mongo/s/move_chunk_request.h" #include "mongo/s/shard_util.h" #include "mongo/util/log.h" @@ -297,65 +294,6 @@ StatusWith<boost::optional<std::pair<BSONObj, BSONObj>>> Chunk::split( return splitStatus.getValue(); } -bool Chunk::moveAndCommit(OperationContext* txn, - const ShardId& toShardId, - long long chunkSize /* bytes */, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete, - int maxTimeMS, - BSONObj& res) const { - uassert(10167, "can't move shard to its current location!", getShardId() != toShardId); - - BSONObjBuilder builder; - MoveChunkRequest::appendAsCommand(&builder, - NamespaceString(_manager->getns()), - _manager->getVersion(), - grid.shardRegistry()->getConfigServerConnectionString(), - _shardId, - toShardId, - _min, - _max, - chunkSize, - secondaryThrottle, - waitForDelete); - builder.append(LiteParsedQuery::cmdOptionMaxTimeMS, maxTimeMS); - - BSONObj cmdObj = builder.obj(); - log() << "Moving chunk with the following arguments: " << cmdObj; - - Status status{ErrorCodes::NotYetInitialized, "Uninitialized"}; - - auto shard = Grid::get(txn)->shardRegistry()->getShard(txn, _shardId); - if (!shard) { - status = Status(ErrorCodes::ShardNotFound, - str::stream() << "shard " << _shardId << " not found"); - } else { - auto response = shard->runCommand(txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - cmdObj, - Shard::RetryPolicy::kNotIdempotent); - if (!response.isOK()) { - status = std::move(response.getStatus()); - } else { - status = std::move(response.getValue().commandStatus); - res = std::move(response.getValue().response); - } - } - - if (status.isOK()) { - LOG(1) << "moveChunk result: " << res; - } else { - warning() << "Move chunk failed" << causedBy(status); - } - - // If succeeded we needs to reload the chunk manager in order to pick up the new location. If - // failed, mongos may be stale. - _manager->reload(txn); - - return status.isOK(); -} - bool Chunk::splitIfShould(OperationContext* txn, long dataWritten) { LastError::Disabled d(&LastError::get(cc())); diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index d9cf1886a6b..28bb1e519cc 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -37,7 +37,6 @@ namespace mongo { class ChunkManager; class ChunkType; -class MigrationSecondaryThrottleOptions; class OperationContext; /** @@ -132,29 +131,6 @@ public: SplitPointMode mode, size_t* resultingSplits) const; - // - // migration support - // - - /** - * Issues a migrate request for this chunk - * - * @param to shard to move this chunk to - * @param chunSize maximum number of bytes beyond which the migrate should no go trhough - * @param writeConcern detailed write concern. NULL means the default write concern. - * @param waitForDelete whether chunk move should wait for cleanup or return immediately - * @param maxTimeMS max time for the migrate request - * @param res the object containing details about the migrate execution - * @return true if move was successful - */ - bool moveAndCommit(OperationContext* txn, - const ShardId& to, - long long chunkSize, - const MigrationSecondaryThrottleOptions& secondaryThrottle, - bool waitForDelete, - int maxTimeMS, - BSONObj& res) const; - /** * marks this chunk as a jumbo chunk * that means the chunk will be inelligble for migrates diff --git a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp index ae156c39753..d853f68a244 100644 --- a/src/mongo/s/commands/cluster_move_chunk_cmd.cpp +++ b/src/mongo/s/commands/cluster_move_chunk_cmd.cpp @@ -38,6 +38,7 @@ #include "mongo/db/client_basic.h" #include "mongo/db/commands.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/balancer/balancer.h" #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/chunk_manager.h" @@ -213,39 +214,30 @@ public: } } - LOG(0) << "CMD: movechunk: " << cmdObj; - - StatusWith<int> maxTimeMS = - LiteParsedQuery::parseMaxTimeMS(cmdObj[LiteParsedQuery::cmdOptionMaxTimeMS]); - - if (!maxTimeMS.isOK()) { - errmsg = maxTimeMS.getStatus().reason(); - return false; - } - const auto secondaryThrottle = uassertStatusOK(MigrationSecondaryThrottleOptions::createFromCommand(cmdObj)); - BSONObj res; - if (!chunk->moveAndCommit(txn, - to->getId(), - maxChunkSizeBytes, - secondaryThrottle, - cmdObj["_waitForDelete"].trueValue(), - maxTimeMS.getValue(), - res)) { - errmsg = "move failed"; - result.append("cause", res); - - if (!res["code"].eoo()) { - result.append(res["code"]); - } + log() << "CMD: movechunk: " << cmdObj; - return false; + { + ChunkType chunkType; + chunkType.setNS(nss.ns()); + chunkType.setName(ChunkType::genID(nss.ns(), chunk->getMin())); + chunkType.setMin(chunk->getMin()); + chunkType.setMax(chunk->getMax()); + chunkType.setShard(chunk->getShardId()); + chunkType.setVersion(info->getVersion()); + + uassertStatusOK( + Balancer::get(txn)->moveSingleChunk(txn, + chunkType, + to->getId(), + maxChunkSizeBytes, + secondaryThrottle, + cmdObj["_waitForDelete"].trueValue())); } result.append("millis", t.millis()); - return true; } diff --git a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp index 7bebe1ac3c7..6eddee0f17c 100644 --- a/src/mongo/s/commands/cluster_shard_collection_cmd.cpp +++ b/src/mongo/s/commands/cluster_shard_collection_cmd.cpp @@ -44,6 +44,7 @@ #include "mongo/db/commands.h" #include "mongo/db/hasher.h" #include "mongo/db/write_concern_options.h" +#include "mongo/s/balancer/balancer.h" #include "mongo/s/balancer/balancer_configuration.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/catalog/catalog_manager.h" @@ -110,33 +111,17 @@ public: int options, std::string& errmsg, BSONObjBuilder& result) { - const string ns = parseNs(dbname, cmdObj); - if (ns.size() == 0) { - errmsg = "no ns"; - return false; - } + const NamespaceString nss(parseNs(dbname, cmdObj)); + uassert(ErrorCodes::InvalidNamespace, "Invalid namespace", nss.isValid()); - const NamespaceString nsStr(ns); - if (!nsStr.isValid()) { - return appendCommandStatus( - result, - Status(ErrorCodes::InvalidNamespace, "invalid collection namespace [" + ns + "]")); - } - - auto config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsStr.db().toString())); - if (!config->isShardingEnabled()) { - return appendCommandStatus( - result, - Status(ErrorCodes::IllegalOperation, - str::stream() << "sharding not enabled for db " << nsStr.db())); - } + auto config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nss.db().toString())); + uassert(ErrorCodes::IllegalOperation, + str::stream() << "sharding not enabled for db " << nss.db(), + config->isShardingEnabled()); - if (config->isSharded(ns)) { - return appendCommandStatus( - result, - Status(ErrorCodes::IllegalOperation, - str::stream() << "sharding already enabled for collection " << ns)); - } + uassert(ErrorCodes::IllegalOperation, + str::stream() << "sharding already enabled for collection " << nss.ns(), + !config->isSharded(nss.ns())); // NOTE: We *must* take ownership of the key here - otherwise the shared BSONObj // becomes corrupt as soon as the command ends. @@ -166,10 +151,7 @@ public: return false; } - if (ns.find(".system.") != string::npos) { - errmsg = "can't shard system namespaces"; - return false; - } + uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem()); vector<ShardId> shardIds; grid.shardRegistry()->getAllShardIds(&shardIds); @@ -200,8 +182,8 @@ public: // check that collection is not capped BSONObj res; { - list<BSONObj> all = conn->getCollectionInfos( - config->name(), BSON("name" << nsToCollectionSubstring(ns))); + list<BSONObj> all = + conn->getCollectionInfos(config->name(), BSON("name" << nss.coll())); if (!all.empty()) { res = all.front().getOwned(); } @@ -241,7 +223,7 @@ public: // 5. If the collection is empty, and it's still possible to create an index // on the proposed key, we go ahead and do so. - list<BSONObj> indexes = conn->getIndexSpecs(ns); + list<BSONObj> indexes = conn->getIndexSpecs(nss.ns()); // 1. Verify consistency with existing unique indexes ShardKeyPattern proposedShardKey(proposedKey); @@ -251,7 +233,7 @@ public: bool isUnique = idx["unique"].trueValue(); if (isUnique && !proposedShardKey.isUniqueIndexCompatible(currentKey)) { - errmsg = str::stream() << "can't shard collection '" << ns << "' " + errmsg = str::stream() << "can't shard collection '" << nss.ns() << "' " << "with unique index on " << currentKey << " " << "and proposed shard key " << proposedKey << ". " << "Uniqueness can't be maintained unless " @@ -276,7 +258,7 @@ public: // per field per collection. if (isHashedShardKey && !idx["seed"].eoo() && idx["seed"].numberInt() != BSONElementHasher::DEFAULT_HASH_SEED) { - errmsg = str::stream() << "can't shard collection " << ns + errmsg = str::stream() << "can't shard collection " << nss.ns() << " with hashed shard key " << proposedKey << " because the hashed index uses a non-default" << " seed of " << idx["seed"].numberInt(); @@ -291,7 +273,7 @@ public: // 3. If proposed key is required to be unique, additionally check for exact match. bool careAboutUnique = cmdObj["unique"].trueValue(); if (hasUsefulIndexForKey && careAboutUnique) { - BSONObj eqQuery = BSON("ns" << ns << "key" << proposedKey); + BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey); BSONObj eqQueryResult; for (list<BSONObj>::iterator it = indexes.begin(); it != indexes.end(); ++it) { @@ -311,8 +293,8 @@ public: bool isCurrentID = str::equals(currKey.firstElementFieldName(), "_id"); if (!isExplicitlyUnique && !isCurrentID) { - errmsg = str::stream() << "can't shard collection " << ns << ", " << proposedKey - << " index not unique, " + errmsg = str::stream() << "can't shard collection " << nss.ns() << ", " + << proposedKey << " index not unique, " << "and unique index explicitly specified"; conn.done(); return false; @@ -324,7 +306,7 @@ public: // Check 2.iii and 2.iv. Make sure no null entries in the sharding index // and that there is a useful, non-multikey index available BSONObjBuilder checkShardingIndexCmd; - checkShardingIndexCmd.append("checkShardingIndex", ns); + checkShardingIndexCmd.append("checkShardingIndex", nss.ns()); checkShardingIndexCmd.append("keyPattern", proposedKey); if (!conn.get()->runCommand("admin", checkShardingIndexCmd.obj(), res)) { @@ -332,7 +314,7 @@ public: conn.done(); return false; } - } else if (conn->count(ns) != 0) { + } else if (conn->count(nss.ns()) != 0) { // 4. if no useful index, and collection is non-empty, fail errmsg = str::stream() << "please create an index that starts with the " << "shard key before sharding."; @@ -344,7 +326,7 @@ public: // 5. If no useful index exists, and collection empty, create one on proposedKey. // Only need to call ensureIndex on primary shard, since indexes get copied to // receiving shard whenever a migrate occurs. - Status status = clusterCreateIndex(txn, ns, proposedKey, careAboutUnique); + Status status = clusterCreateIndex(txn, nss.ns(), proposedKey, careAboutUnique); if (!status.isOK()) { errmsg = str::stream() << "ensureIndex failed to create index on " << "primary shard: " << status.reason(); @@ -353,7 +335,7 @@ public: } } - bool isEmpty = (conn->count(ns) == 0); + bool isEmpty = (conn->count(nss.ns()) == 0); conn.done(); @@ -419,25 +401,26 @@ public: LOG(0) << "CMD: shardcollection: " << cmdObj; - audit::logShardCollection(ClientBasic::getCurrent(), ns, proposedKey, careAboutUnique); + audit::logShardCollection( + ClientBasic::getCurrent(), nss.ns(), proposedKey, careAboutUnique); Status status = grid.catalogManager(txn)->shardCollection( - txn, ns, proposedShardKey, careAboutUnique, initSplits, std::set<ShardId>{}); + txn, nss.ns(), proposedShardKey, careAboutUnique, initSplits, {}); if (!status.isOK()) { return appendCommandStatus(result, status); } // Make sure the cached metadata for the collection knows that we are now sharded - config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nsStr.db().toString())); - config->getChunkManager(txn, nsStr.ns(), true /* force */); + config = uassertStatusOK(grid.catalogCache()->getDatabase(txn, nss.db().toString())); + config->getChunkManager(txn, nss.ns(), true /* force */); - result << "collectionsharded" << ns; + result << "collectionsharded" << nss.ns(); // Only initially move chunks when using a hashed shard key if (isHashedShardKey && isEmpty) { // Reload the new config info. If we created more than one initial chunk, then // we need to move them around to balance. - shared_ptr<ChunkManager> chunkManager = config->getChunkManager(txn, ns, true); + shared_ptr<ChunkManager> chunkManager = config->getChunkManager(txn, nss.ns(), true); ChunkMap chunkMap = chunkManager->getChunkMap(); // 2. Move and commit each "big chunk" to a different shard. @@ -451,25 +434,30 @@ public: shared_ptr<Chunk> chunk = c->second; - // can't move chunk to shard it's already on + // Can't move chunk to shard it's already on if (to->getId() == chunk->getShardId()) { continue; } - BSONObj moveResult; - WriteConcernOptions noThrottle; - if (!chunk->moveAndCommit( - txn, - to->getId(), - Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - MigrationSecondaryThrottleOptions::create( - MigrationSecondaryThrottleOptions::kOff), - true, - 0, - moveResult)) { + ChunkType chunkType; + chunkType.setNS(nss.ns()); + chunkType.setName(ChunkType::genID(nss.ns(), chunk->getMin())); + chunkType.setMin(chunk->getMin()); + chunkType.setMax(chunk->getMax()); + chunkType.setShard(chunk->getShardId()); + chunkType.setVersion(chunkManager->getVersion()); + + Status moveStatus = Balancer::get(txn)->moveSingleChunk( + txn, + chunkType, + to->getId(), + Grid::get(txn)->getBalancerConfiguration()->getMaxChunkSizeBytes(), + MigrationSecondaryThrottleOptions::create( + MigrationSecondaryThrottleOptions::kOff), + true); + if (!moveStatus.isOK()) { warning() << "couldn't move chunk " << chunk->toString() << " to shard " << *to - << " while sharding collection " << ns << "." - << " Reason: " << moveResult; + << " while sharding collection " << nss.ns() << causedBy(moveStatus); } } @@ -478,7 +466,7 @@ public: } // Reload the config info, after all the migrations - chunkManager = config->getChunkManager(txn, ns, true); + chunkManager = config->getChunkManager(txn, nss.ns(), true); // 3. Subdivide the big chunks by splitting at each of the points in "allSplits" // that we haven't already split by. @@ -491,7 +479,7 @@ public: auto splitStatus = shardutil::splitChunkAtMultiplePoints( txn, currentChunk->getShardId(), - NamespaceString(ns), + nss, chunkManager->getShardKeyPattern(), chunkManager->getVersion(), currentChunk->getMin(), @@ -499,7 +487,7 @@ public: subSplits); if (!splitStatus.isOK()) { warning() << "couldn't split chunk " << currentChunk->toString() - << " while sharding collection " << ns + << " while sharding collection " << nss.ns() << causedBy(splitStatus.getStatus()); } @@ -523,7 +511,7 @@ public: // Proactively refresh the chunk manager. Not really necessary, but this way it's // immediately up-to-date the next time it's used. - config->getChunkManager(txn, ns, true); + config->getChunkManager(txn, nss.ns(), true); } return true; |