/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" #include #include #include #include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/bson/simple_bsonobj_comparator.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/db/audit.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/hasher.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/write_concern_options.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_write.h" #include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" #include "mongo/s/migration_secondary_throttle_options.h" #include "mongo/s/request_types/shard_collection_gen.h" #include "mongo/s/shard_util.h" #include "mongo/util/log.h" namespace mongo { namespace { /** * Constructs the BSON specification document for the given namespace, index key and options. */ BSONObj makeCreateIndexesCmd(const NamespaceString& nss, const BSONObj& keys, const BSONObj& collation, bool unique) { BSONObjBuilder index; // Required fields for an index. index.append("key", keys); StringBuilder indexName; bool isFirstKey = true; for (BSONObjIterator keyIter(keys); keyIter.more();) { BSONElement currentKey = keyIter.next(); if (isFirstKey) { isFirstKey = false; } else { indexName << "_"; } indexName << currentKey.fieldName() << "_"; if (currentKey.isNumber()) { indexName << currentKey.numberInt(); } else { indexName << currentKey.str(); // this should match up with shell command } } index.append("name", indexName.str()); // Index options. if (!collation.isEmpty()) { // Creating an index with the "collation" option requires a v=2 index. index.append("v", static_cast(IndexDescriptor::IndexVersion::kV2)); index.append("collation", collation); } if (unique && !IndexDescriptor::isIdIndexPattern(keys)) { index.appendBool("unique", unique); } // The outer createIndexes command. BSONObjBuilder createIndexes; createIndexes.append("createIndexes", nss.coll()); createIndexes.append("indexes", BSON_ARRAY(index.obj())); createIndexes.append("writeConcern", WriteConcernOptions::Majority); return createIndexes.obj(); } class ShardCollectionCmd : public Command { public: ShardCollectionCmd() : Command("shardCollection", "shardcollection") {} bool slaveOk() const override { return true; } bool adminOnly() const override { return true; } bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } void help(std::stringstream& help) const override { help << "Shard a collection. Requires key. Optional unique." << " Sharding must already be enabled for the database.\n" << " { enablesharding : \"\" }\n"; } Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))), ActionType::enableSharding)) { return Status(ErrorCodes::Unauthorized, "Unauthorized"); } return Status::OK(); } std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { return parseNsFullyQualified(dbname, cmdObj); } bool run(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, std::string& errmsg, BSONObjBuilder& result) override { const NamespaceString nss(parseNs(dbname, cmdObj)); auto shardCollRequest = ShardCollection::parse(IDLParserErrorContext("ShardCollection"), cmdObj); auto const catalogClient = Grid::get(opCtx)->catalogClient(opCtx); auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); auto const catalogCache = Grid::get(opCtx)->catalogCache(); auto dbInfo = uassertStatusOK(catalogCache->getDatabase(opCtx, nss.db())); // Ensure sharding is allowed on the database uassert(ErrorCodes::IllegalOperation, str::stream() << "sharding not enabled for db " << nss.db(), dbInfo.shardingEnabled()); auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); // Ensure that the collection is not sharded already uassert(ErrorCodes::IllegalOperation, str::stream() << "sharding already enabled for collection " << nss.ns(), !routingInfo.cm()); auto proposedKey(shardCollRequest.getKey().getOwned()); ShardKeyPattern proposedKeyPattern(proposedKey); if (!proposedKeyPattern.isValid()) { errmsg = str::stream() << "Unsupported shard key pattern. Pattern must" << " either be a single hashed field, or a list" << " of ascending fields."; return false; } bool isHashedShardKey = proposedKeyPattern.isHashedPattern(); bool careAboutUnique = shardCollRequest.getUnique(); if (isHashedShardKey && careAboutUnique) { dassert(proposedKey.nFields() == 1); // it's possible to ensure uniqueness on the hashed field by // declaring an additional (non-hashed) unique index on the field, // but the hashed shard key itself should not be declared unique errmsg = "hashed shard keys cannot be declared unique."; return false; } uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem()); // Ensure that the collation is valid. Currently we only allow the simple collation. bool simpleCollationSpecified = false; if (shardCollRequest.getCollation()) { auto& collation = *shardCollRequest.getCollation(); auto collator = CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation); if (!collator.getStatus().isOK()) { return appendCommandStatus(result, collator.getStatus()); } if (collator.getValue()) { return appendCommandStatus( result, {ErrorCodes::BadValue, str::stream() << "The collation for shardCollection must be {locale: 'simple'}, " << "but found: " << collation}); } simpleCollationSpecified = true; } std::vector shardIds; shardRegistry->getAllShardIds(&shardIds); const int numShards = shardIds.size(); // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000 // chunks in total to limit the amount of memory this command consumes so there is less // danger of an OOM error. const int maxNumInitialChunksForShards = numShards * 8192; const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption int numChunks = shardCollRequest.getNumInitialChunks(); if (numChunks > maxNumInitialChunksForShards || numChunks > maxNumInitialChunksTotal) { errmsg = str::stream() << "numInitialChunks cannot be more than either: " << maxNumInitialChunksForShards << ", 8192 * number of shards; or " << maxNumInitialChunksTotal; return false; } // The rest of the checks require a connection to the primary db auto primaryShard = routingInfo.primary(); ScopedDbConnection conn(primaryShard->getConnString()); // Retrieve the collection metadata in order to verify that it is legal to shard this // collection. BSONObj res; { std::list all = conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); if (!all.empty()) { res = all.front().getOwned(); } } BSONObj defaultCollation; if (!res.isEmpty()) { // Check that namespace is not a view. { std::string namespaceType; auto status = bsonExtractStringField(res, "type", &namespaceType); if (!status.isOK()) { conn.done(); return appendCommandStatus(result, status); } if (namespaceType == "view") { conn.done(); return appendCommandStatus( result, {ErrorCodes::CommandNotSupportedOnView, "Views cannot be sharded."}); } } BSONObj collectionOptions; if (res["options"].type() == BSONType::Object) { collectionOptions = res["options"].Obj(); } // Check that collection is not capped. if (collectionOptions["capped"].trueValue()) { errmsg = "can't shard capped collection"; conn.done(); return false; } // Get collection default collation. { BSONElement collationElement; auto status = bsonExtractTypedField( collectionOptions, "collation", BSONType::Object, &collationElement); if (status.isOK()) { defaultCollation = collationElement.Obj().getOwned(); if (defaultCollation.isEmpty()) { conn.done(); return appendCommandStatus( result, {ErrorCodes::BadValue, "Default collation in collection metadata cannot be empty."}); } } else if (status != ErrorCodes::NoSuchKey) { conn.done(); return appendCommandStatus( result, {status.code(), str::stream() << "Could not parse default collation in collection metadata " << causedBy(status)}); } } // If the collection has a non-simple default collation but the user did not specify the // simple collation explicitly, return an error. if (!defaultCollation.isEmpty() && !simpleCollationSpecified) { conn.done(); return appendCommandStatus(result, {ErrorCodes::BadValue, str::stream() << "Collection has default collation: " << collectionOptions["collation"] << ". Must specify collation {locale: 'simple'}"}); } } // The proposed shard key must be validated against the set of existing indexes. // In particular, we must ensure the following constraints // // 1. All existing unique indexes, except those which start with the _id index, // must contain the proposed key as a prefix (uniqueness of the _id index is // ensured by the _id generation process or guaranteed by the user). // // 2. If the collection is not empty, there must exist at least one index that // is "useful" for the proposed key. A "useful" index is defined as follows // Useful Index: // i. contains proposedKey as a prefix // ii. is not a sparse index, partial index, or index with a non-simple collation // iii. contains no null values // iv. is not multikey (maybe lift this restriction later) // v. if a hashed index, has default seed (lift this restriction later) // // 3. If the proposed shard key is specified as unique, there must exist a useful, // unique index exactly equal to the proposedKey (not just a prefix). // // After validating these constraint: // // 4. If there is no useful index, and the collection is non-empty, we // must fail. // // 5. If the collection is empty, and it's still possible to create an index // on the proposed key, we go ahead and do so. std::list indexes = conn->getIndexSpecs(nss.ns()); // 1. Verify consistency with existing unique indexes ShardKeyPattern proposedShardKey(proposedKey); for (const auto& idx : indexes) { BSONObj currentKey = idx["key"].embeddedObject(); bool isUnique = idx["unique"].trueValue(); if (isUnique && !proposedShardKey.isUniqueIndexCompatible(currentKey)) { errmsg = str::stream() << "can't shard collection '" << nss.ns() << "' " << "with unique index on " << currentKey << " " << "and proposed shard key " << proposedKey << ". " << "Uniqueness can't be maintained unless " << "shard key is a prefix"; conn.done(); return false; } } // 2. Check for a useful index bool hasUsefulIndexForKey = false; for (const auto& idx : indexes) { BSONObj currentKey = idx["key"].embeddedObject(); // Check 2.i. and 2.ii. if (!idx["sparse"].trueValue() && idx["filter"].eoo() && idx["collation"].eoo() && proposedKey.isPrefixOf(currentKey, SimpleBSONElementComparator::kInstance)) { // We can't currently use hashed indexes with a non-default hash seed // Check v. // Note that this means that, for sharding, we only support one hashed index // per field per collection. if (isHashedShardKey && !idx["seed"].eoo() && idx["seed"].numberInt() != BSONElementHasher::DEFAULT_HASH_SEED) { errmsg = str::stream() << "can't shard collection " << nss.ns() << " with hashed shard key " << proposedKey << " because the hashed index uses a non-default" << " seed of " << idx["seed"].numberInt(); conn.done(); return false; } hasUsefulIndexForKey = true; } } // 3. If proposed key is required to be unique, additionally check for exact match. if (hasUsefulIndexForKey && careAboutUnique) { BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey); BSONObj eqQueryResult; for (const auto& idx : indexes) { if (SimpleBSONObjComparator::kInstance.evaluate(idx["key"].embeddedObject() == proposedKey)) { eqQueryResult = idx; break; } } if (eqQueryResult.isEmpty()) { // If no exact match, index not useful, but still possible to create one later hasUsefulIndexForKey = false; } else { bool isExplicitlyUnique = eqQueryResult["unique"].trueValue(); BSONObj currKey = eqQueryResult["key"].embeddedObject(); bool isCurrentID = str::equals(currKey.firstElementFieldName(), "_id"); if (!isExplicitlyUnique && !isCurrentID) { errmsg = str::stream() << "can't shard collection " << nss.ns() << ", " << proposedKey << " index not unique, " << "and unique index explicitly specified"; conn.done(); return false; } } } if (hasUsefulIndexForKey) { // Check 2.iii and 2.iv. Make sure no null entries in the sharding index // and that there is a useful, non-multikey index available BSONObjBuilder checkShardingIndexCmd; checkShardingIndexCmd.append("checkShardingIndex", nss.ns()); checkShardingIndexCmd.append("keyPattern", proposedKey); if (!conn.get()->runCommand("admin", checkShardingIndexCmd.obj(), res)) { errmsg = res["errmsg"].str(); conn.done(); return false; } } else if (conn->count(nss.ns()) != 0) { // 4. if no useful index, and collection is non-empty, fail errmsg = str::stream() << "please create an index that starts with the " << "shard key before sharding."; result.append("proposedKey", proposedKey); result.append("curIndexes", indexes); conn.done(); return false; } else { // 5. If no useful index exists, and collection empty, create one on proposedKey. // Only need to call ensureIndex on primary shard, since indexes get copied to // receiving shard whenever a migrate occurs. // If the collection has a default collation, explicitly send the simple // collation as part of the createIndex request. BSONObj collation = !defaultCollation.isEmpty() ? CollationSpec::kSimpleSpec : BSONObj(); auto createIndexesCmd = makeCreateIndexesCmd(nss, proposedKey, collation, careAboutUnique); const auto swResponse = primaryShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), nss.db().toString(), createIndexesCmd, Shard::RetryPolicy::kNotIdempotent); auto createIndexesStatus = swResponse.getStatus(); if (createIndexesStatus.isOK()) { const auto response = swResponse.getValue(); createIndexesStatus = (!response.commandStatus.isOK()) ? response.commandStatus : response.writeConcernStatus; } if (!createIndexesStatus.isOK()) { errmsg = str::stream() << "ensureIndex failed to create index on " << "primary shard: " << createIndexesStatus.reason(); conn.done(); return false; } } bool isEmpty = (conn->count(nss.ns()) == 0); conn.done(); // Pre-splitting: // For new collections which use hashed shard keys, we can can pre-split the // range of possible hashes into a large number of chunks, and distribute them // evenly at creation time. Until we design a better initialization scheme, the // safest way to pre-split is to // 1. make one big chunk for each shard // 2. move them one at a time // 3. split the big chunks to achieve the desired total number of initial chunks std::vector initSplits; // there will be at most numShards-1 of these std::vector allSplits; // all of the initial desired split points // only pre-split when using a hashed shard key and collection is still empty if (isHashedShardKey && isEmpty) { if (numChunks <= 0) { // default number of initial chunks numChunks = 2 * numShards; } // hashes are signed, 64-bit ints. So we divide the range (-MIN long, +MAX long) // into intervals of size (2^64/numChunks) and create split points at the // boundaries. The logic below ensures that initial chunks are all // symmetric around 0. long long intervalSize = (std::numeric_limits::max() / numChunks) * 2; long long current = 0; if (numChunks % 2 == 0) { allSplits.push_back(BSON(proposedKey.firstElementFieldName() << current)); current += intervalSize; } else { current += intervalSize / 2; } for (int i = 0; i < (numChunks - 1) / 2; i++) { allSplits.push_back(BSON(proposedKey.firstElementFieldName() << current)); allSplits.push_back(BSON(proposedKey.firstElementFieldName() << -current)); current += intervalSize; } sort(allSplits.begin(), allSplits.end(), SimpleBSONObjComparator::kInstance.makeLessThan()); // 1. the initial splits define the "big chunks" that we will subdivide later int lastIndex = -1; for (int i = 1; i < numShards; i++) { if (lastIndex < (i * numChunks) / numShards - 1) { lastIndex = (i * numChunks) / numShards - 1; initSplits.push_back(allSplits[lastIndex]); } } } else if (numChunks > 0) { conn.done(); return appendCommandStatus( result, {ErrorCodes::InvalidOptions, str::stream() << (!isHashedShardKey ? "numInitialChunks is not supported " "when the shard key is not hashed." : "numInitialChunks is not supported " "when the collection is not empty.")}); } LOG(0) << "CMD: shardcollection: " << cmdObj; audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, careAboutUnique); uassertStatusOK(catalogClient->shardCollection(opCtx, nss.ns(), proposedShardKey, defaultCollation, careAboutUnique, initSplits, std::set{})); result << "collectionsharded" << nss.ns(); // Make sure the cached metadata for the collection knows that we are now sharded catalogCache->invalidateShardedCollection(nss); // Only initially move chunks when using a hashed shard key if (isHashedShardKey && isEmpty) { routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::ConflictingOperationInProgress, "Collection was successfully written as sharded but got dropped before it " "could be evenly distributed", routingInfo.cm()); auto chunkManager = routingInfo.cm(); const auto chunkMap = chunkManager->chunkMap(); // 2. Move and commit each "big chunk" to a different shard. int i = 0; for (ChunkMap::const_iterator c = chunkMap.begin(); c != chunkMap.end(); ++c, ++i) { const ShardId& shardId = shardIds[i % numShards]; const auto toStatus = shardRegistry->getShard(opCtx, shardId); if (!toStatus.isOK()) { continue; } const auto to = toStatus.getValue(); auto chunk = c->second; // Can't move chunk to shard it's already on if (to->getId() == chunk->getShardId()) { continue; } ChunkType chunkType; chunkType.setNS(nss.ns()); chunkType.setMin(chunk->getMin()); chunkType.setMax(chunk->getMax()); chunkType.setShard(chunk->getShardId()); chunkType.setVersion(chunkManager->getVersion()); Status moveStatus = configsvr_client::moveChunk( opCtx, chunkType, to->getId(), Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), MigrationSecondaryThrottleOptions::create( MigrationSecondaryThrottleOptions::kOff), true); if (!moveStatus.isOK()) { warning() << "couldn't move chunk " << redact(chunk->toString()) << " to shard " << *to << " while sharding collection " << nss.ns() << causedBy(redact(moveStatus)); } } if (allSplits.empty()) { return true; } // Reload the config info, after all the migrations catalogCache->invalidateShardedCollection(nss); routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); uassert(ErrorCodes::ConflictingOperationInProgress, "Collection was successfully written as sharded but got dropped before it " "could be evenly distributed", routingInfo.cm()); chunkManager = routingInfo.cm(); // 3. Subdivide the big chunks by splitting at each of the points in "allSplits" // that we haven't already split by. auto currentChunk = chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[0]); std::vector subSplits; for (unsigned i = 0; i <= allSplits.size(); i++) { if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) { if (!subSplits.empty()) { auto splitStatus = shardutil::splitChunkAtMultiplePoints( opCtx, currentChunk->getShardId(), nss, chunkManager->getShardKeyPattern(), chunkManager->getVersion(), ChunkRange(currentChunk->getMin(), currentChunk->getMax()), subSplits); if (!splitStatus.isOK()) { warning() << "couldn't split chunk " << redact(currentChunk->toString()) << " while sharding collection " << nss.ns() << causedBy(redact(splitStatus.getStatus())); } subSplits.clear(); } if (i < allSplits.size()) { currentChunk = chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[i]); } } else { BSONObj splitPoint(allSplits[i]); // Do not split on the boundaries if (currentChunk->getMin().woCompare(splitPoint) == 0) { continue; } subSplits.push_back(splitPoint); } } } return true; } } shardCollectionCmd; } // namespace } // namespace mongo