diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-07-24 16:25:24 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2017-07-31 18:12:03 -0400 |
commit | 6300b3bd4ad9cd238a02bdb8ca681a447913f1af (patch) | |
tree | f23f74c7729c6177b63f34261f473cec79fcddae /src | |
parent | 6b34ad314773be3f3214d6f4186fde6ec0c12e39 (diff) | |
download | mongo-6300b3bd4ad9cd238a02bdb8ca681a447913f1af.tar.gz |
SERVER-30292 split mind-bogglingly humongous shardCollection command body into a series of helper functions
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/config/configsvr_shard_collection_command.cpp | 1054 |
1 files changed, 551 insertions, 503 deletions
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp index e2fc9203c51..65112eec4ff 100644 --- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp +++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp @@ -122,6 +122,514 @@ BSONObj makeCreateIndexesCmd(const NamespaceString& nss, } /** + * Validates the options specified in the request. + * + * WARNING: After validating the request's collation, replaces it with the collection default + * collation. + */ +void validateAndDeduceFullRequestOptions(OperationContext* opCtx, + const NamespaceString& nss, + const ShardKeyPattern& shardKeyPattern, + int numShards, + ScopedDbConnection& conn, + ConfigsvrShardCollection* request) { + uassert( + ErrorCodes::InvalidOptions, "cannot have empty shard key", !request->getKey().isEmpty()); + + // Ensure the proposed shard key is valid. + uassert(ErrorCodes::InvalidOptions, + str::stream() + << "Unsupported shard key pattern " + << shardKeyPattern.toString() + << ". Pattern must either be a single hashed field, or a list of ascending fields", + shardKeyPattern.isValid()); + + // Ensure that hashed and unique are not both set. + uassert(ErrorCodes::InvalidOptions, + "Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on " + "the hashed field by declaring an additional (non-hashed) unique index on the field.", + !shardKeyPattern.isHashedPattern() || !request->getUnique()); + + // Ensure the namespace is valid. + uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem()); + + // Ensure the collation is valid. Currently we only allow the simple collation. + bool simpleCollationSpecified = false; + if (request->getCollation()) { + auto& collation = *request->getCollation(); + auto collator = uassertStatusOK( + CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation)); + uassert(ErrorCodes::BadValue, + str::stream() << "The collation for shardCollection must be {locale: 'simple'}, " + << "but found: " + << collation, + !collator); + simpleCollationSpecified = true; + } + + // Ensure numInitialChunks is within valid bounds. + // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000 + // chunks in total to limit the amount of memory this command consumes so there is less + // danger of an OOM error. + const int maxNumInitialChunksForShards = numShards * 8192; + const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption + int numChunks = request->getNumInitialChunks(); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "numInitialChunks cannot be more than either: " + << maxNumInitialChunksForShards + << ", 8192 * number of shards; or " + << maxNumInitialChunksTotal, + numChunks <= maxNumInitialChunksForShards && numChunks <= maxNumInitialChunksTotal); + + // Retrieve the collection metadata in order to verify that it is legal to shard this + // collection. + BSONObj res; + { + std::list<BSONObj> all = + conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + if (!all.empty()) { + res = all.front().getOwned(); + } + } + + BSONObj defaultCollation; + + if (!res.isEmpty()) { + // Check that namespace is not a view. + { + std::string namespaceType; + uassertStatusOK(bsonExtractStringField(res, "type", &namespaceType)); + uassert(ErrorCodes::CommandNotSupportedOnView, + "Views cannot be sharded.", + namespaceType != "view"); + } + + BSONObj collectionOptions; + if (res["options"].type() == BSONType::Object) { + collectionOptions = res["options"].Obj(); + } + + // Check that collection is not capped. + uassert(ErrorCodes::InvalidOptions, + "can't shard a capped collection", + !collectionOptions["capped"].trueValue()); + + // Get collection default collation. + BSONElement collationElement; + auto status = bsonExtractTypedField( + collectionOptions, "collation", BSONType::Object, &collationElement); + if (status.isOK()) { + defaultCollation = collationElement.Obj().getOwned(); + uassert(ErrorCodes::BadValue, + "Default collation in collection metadata cannot be empty.", + !defaultCollation.isEmpty()); + } else if (status != ErrorCodes::NoSuchKey) { + uassertStatusOK(status); + } + + // If the collection has a non-simple default collation but the user did not specify the + // simple collation explicitly, return an error. + uassert(ErrorCodes::BadValue, + str::stream() << "Collection has default collation: " + << collectionOptions["collation"] + << ". Must specify collation {locale: 'simple'}", + defaultCollation.isEmpty() || simpleCollationSpecified); + } + + // Once the request's collation has been validated as simple or unset, replace it with the + // deduced collection default collation. + request->setCollation(defaultCollation.getOwned()); +} + +/** + * Throws an exception if the collection is already sharded with different options. + * + * Returns true if the collection is already sharded with the same options, and false if the + * collection is not sharded. + */ +bool checkIfAlreadyShardedWithSameOptions(OperationContext* opCtx, + const NamespaceString& nss, + const ConfigsvrShardCollection& request) { + auto catalogCache = Grid::get(opCtx)->catalogCache(); + + // We must reload the collection while metadata commands are split between mongos and the + // config servers. + catalogCache->invalidateShardedCollection(nss); + auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + + // If the collection is already sharded, fail if the deduced options in this request do not + // match the options the collection was originally sharded with. + if (routingInfo.cm()) { + auto existingColl = + uassertStatusOK(Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss.ns())) + .value; + + CollectionType requestedOptions; + requestedOptions.setNs(nss); + requestedOptions.setKeyPattern(KeyPattern(request.getKey())); + requestedOptions.setDefaultCollation(*request.getCollation()); + requestedOptions.setUnique(request.getUnique()); + + uassert(ErrorCodes::AlreadyInitialized, + str::stream() << "sharding already enabled for collection " << nss.ns() + << " with options " + << existingColl.toString(), + requestedOptions.hasSameOptions(existingColl)); + + // If the options do match, we can immediately return success. + return true; + } + + // Not currently sharded. + return false; +} + +/** + * Compares the proposed shard key with the collection's existing indexes on the primary shard to + * ensure they are a legal combination. + * + * If the collection is empty and no index on the shard key exists, creates the required index. + */ +void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& proposedKey, + const ShardKeyPattern& shardKeyPattern, + const std::shared_ptr<Shard> primaryShard, + ScopedDbConnection& conn, + const ConfigsvrShardCollection& request) { + // The proposed shard key must be validated against the set of existing indexes. + // In particular, we must ensure the following constraints + // + // 1. All existing unique indexes, except those which start with the _id index, + // must contain the proposed key as a prefix (uniqueness of the _id index is + // ensured by the _id generation process or guaranteed by the user). + // + // 2. If the collection is not empty, there must exist at least one index that + // is "useful" for the proposed key. A "useful" index is defined as follows + // Useful Index: + // i. contains proposedKey as a prefix + // ii. is not a sparse index, partial index, or index with a non-simple collation + // iii. contains no null values + // iv. is not multikey (maybe lift this restriction later) + // v. if a hashed index, has default seed (lift this restriction later) + // + // 3. If the proposed shard key is specified as unique, there must exist a useful, + // unique index exactly equal to the proposedKey (not just a prefix). + // + // After validating these constraint: + // + // 4. If there is no useful index, and the collection is non-empty, we + // must fail. + // + // 5. If the collection is empty, and it's still possible to create an index + // on the proposed key, we go ahead and do so. + + std::list<BSONObj> indexes = conn->getIndexSpecs(nss.ns()); + + // 1. Verify consistency with existing unique indexes + for (const auto& idx : indexes) { + BSONObj currentKey = idx["key"].embeddedObject(); + bool isUnique = idx["unique"].trueValue(); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "can't shard collection '" << nss.ns() << "' with unique index on " + << currentKey + << " and proposed shard key " + << proposedKey + << ". Uniqueness can't be maintained unless shard key is a prefix", + !isUnique || shardKeyPattern.isUniqueIndexCompatible(currentKey)); + } + + // 2. Check for a useful index + bool hasUsefulIndexForKey = false; + for (const auto& idx : indexes) { + BSONObj currentKey = idx["key"].embeddedObject(); + // Check 2.i. and 2.ii. + if (!idx["sparse"].trueValue() && idx["filter"].eoo() && idx["collation"].eoo() && + proposedKey.isPrefixOf(currentKey, SimpleBSONElementComparator::kInstance)) { + // We can't currently use hashed indexes with a non-default hash seed + // Check v. + // Note that this means that, for sharding, we only support one hashed index + // per field per collection. + uassert(ErrorCodes::InvalidOptions, + str::stream() << "can't shard collection " << nss.ns() + << " with hashed shard key " + << proposedKey + << " because the hashed index uses a non-default seed of " + << idx["seed"].numberInt(), + !shardKeyPattern.isHashedPattern() || idx["seed"].eoo() || + idx["seed"].numberInt() == BSONElementHasher::DEFAULT_HASH_SEED); + hasUsefulIndexForKey = true; + } + } + + // 3. If proposed key is required to be unique, additionally check for exact match. + + if (hasUsefulIndexForKey && request.getUnique()) { + BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey); + BSONObj eqQueryResult; + + for (const auto& idx : indexes) { + if (SimpleBSONObjComparator::kInstance.evaluate(idx["key"].embeddedObject() == + proposedKey)) { + eqQueryResult = idx; + break; + } + } + + if (eqQueryResult.isEmpty()) { + // If no exact match, index not useful, but still possible to create one later + hasUsefulIndexForKey = false; + } else { + bool isExplicitlyUnique = eqQueryResult["unique"].trueValue(); + BSONObj currKey = eqQueryResult["key"].embeddedObject(); + bool isCurrentID = str::equals(currKey.firstElementFieldName(), "_id"); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "can't shard collection " << nss.ns() << ", " << proposedKey + << " index not unique, and unique index explicitly specified", + isExplicitlyUnique || isCurrentID); + } + } + + if (hasUsefulIndexForKey) { + // Check 2.iii and 2.iv. Make sure no null entries in the sharding index + // and that there is a useful, non-multikey index available + BSONObjBuilder checkShardingIndexCmd; + checkShardingIndexCmd.append("checkShardingIndex", nss.ns()); + checkShardingIndexCmd.append("keyPattern", proposedKey); + BSONObj res; + auto success = conn.get()->runCommand("admin", checkShardingIndexCmd.obj(), res); + uassert(ErrorCodes::OperationFailed, res["errmsg"].str(), success); + } else if (conn->count(nss.ns()) != 0) { + // 4. if no useful index, and collection is non-empty, fail + uasserted(ErrorCodes::InvalidOptions, + "Please create an index that starts with the proposed shard key before " + "sharding the collection"); + } else { + // 5. If no useful index exists, and collection empty, create one on proposedKey. + // Only need to call ensureIndex on primary shard, since indexes get copied to + // receiving shard whenever a migrate occurs. + // If the collection has a default collation, explicitly send the simple + // collation as part of the createIndex request. + BSONObj collation = + !request.getCollation()->isEmpty() ? CollationSpec::kSimpleSpec : BSONObj(); + auto createIndexesCmd = + makeCreateIndexesCmd(nss, proposedKey, collation, request.getUnique()); + + const auto swResponse = primaryShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + nss.db().toString(), + createIndexesCmd, + Shard::RetryPolicy::kNotIdempotent); + auto createIndexesStatus = swResponse.getStatus(); + if (createIndexesStatus.isOK()) { + const auto response = swResponse.getValue(); + createIndexesStatus = (!response.commandStatus.isOK()) ? response.commandStatus + : response.writeConcernStatus; + } + uassertStatusOK(createIndexesStatus); + } +} + +/** + * For new collections which use hashed shard keys, we can can pre-split the range of possible + * hashes into a large number of chunks, and distribute them evenly at creation time. Until we + * design a better initialization scheme, the safest way to pre-split is to make one big chunk for + * each shard and migrate them one at a time. + * + * Populates 'initSplits' with the split points to use on the primary shard to produce the initial + * "big chunks." + * Also populates 'allSplits' with the additional split points to use on the "big chunks" after the + * "big chunks" have been spread evenly across shards through migrations. + */ +void determinePresplittingPoints(OperationContext* opCtx, + int numShards, + bool isEmpty, + const BSONObj& proposedKey, + const ShardKeyPattern& shardKeyPattern, + const ConfigsvrShardCollection& request, + std::vector<BSONObj>* initSplits, + std::vector<BSONObj>* allSplits) { + auto numChunks = request.getNumInitialChunks(); + + if (request.getInitialSplitPoints()) { + *initSplits = std::move(*request.getInitialSplitPoints()); + return; + } + + if (shardKeyPattern.isHashedPattern() && isEmpty) { + // If initial split points are not specified, only pre-split when using a hashed shard + // key and the collection is empty + if (numChunks <= 0) { + // default number of initial chunks + numChunks = 2 * numShards; + } + + // hashes are signed, 64-bit ints. So we divide the range (-MIN long, +MAX long) + // into intervals of size (2^64/numChunks) and create split points at the + // boundaries. The logic below ensures that initial chunks are all + // symmetric around 0. + long long intervalSize = (std::numeric_limits<long long>::max() / numChunks) * 2; + long long current = 0; + + if (numChunks % 2 == 0) { + allSplits->push_back(BSON(proposedKey.firstElementFieldName() << current)); + current += intervalSize; + } else { + current += intervalSize / 2; + } + + for (int i = 0; i < (numChunks - 1) / 2; i++) { + allSplits->push_back(BSON(proposedKey.firstElementFieldName() << current)); + allSplits->push_back(BSON(proposedKey.firstElementFieldName() << -current)); + current += intervalSize; + } + + sort(allSplits->begin(), + allSplits->end(), + SimpleBSONObjComparator::kInstance.makeLessThan()); + + // The initial splits define the "big chunks" that we will subdivide later. + int lastIndex = -1; + for (int i = 1; i < numShards; i++) { + if (lastIndex < (i * numChunks) / numShards - 1) { + lastIndex = (i * numChunks) / numShards - 1; + initSplits->push_back(allSplits->at(lastIndex)); + } + } + } else if (numChunks > 0) { + uasserted(ErrorCodes::InvalidOptions, + str::stream() << (!shardKeyPattern.isHashedPattern() + ? "numInitialChunks is not supported " + "when the shard key is not hashed." + : "numInitialChunks is not supported " + "when the collection is not empty.")); + } +} + +/** + * Migrates the initial "big chunks" from the primary shard to spread them evenly across the shards. + * + * If 'allSplits' is not empty, additionally splits each "big chunk" into smaller chunks using the + * points in 'allSplits.' + */ +void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx, + const NamespaceString& nss, + int numShards, + const std::vector<ShardId>& shardIds, + bool isEmpty, + const ShardKeyPattern& shardKeyPattern, + const std::vector<BSONObj>& allSplits) { + auto catalogCache = Grid::get(opCtx)->catalogCache(); + + if (!shardKeyPattern.isHashedPattern()) { + // Only initially move chunks when using a hashed shard key. + return; + } + + if (!isEmpty) { + // If the collection is not empty, rely on the balancer to migrate the chunks. + return; + } + + auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + uassert(ErrorCodes::ConflictingOperationInProgress, + "Collection was successfully written as sharded but got dropped before it " + "could be evenly distributed", + routingInfo.cm()); + + auto chunkManager = routingInfo.cm(); + + // Move and commit each "big chunk" to a different shard. + int i = 0; + for (auto chunk : chunkManager->chunks()) { + const ShardId& shardId = shardIds[i++ % numShards]; + const auto toStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); + if (!toStatus.isOK()) { + continue; + } + const auto to = toStatus.getValue(); + + // Can't move chunk to shard it's already on + if (to->getId() == chunk->getShardId()) { + continue; + } + + ChunkType chunkType; + chunkType.setNS(nss.ns()); + chunkType.setMin(chunk->getMin()); + chunkType.setMax(chunk->getMax()); + chunkType.setShard(chunk->getShardId()); + chunkType.setVersion(chunkManager->getVersion()); + + Status moveStatus = configsvr_client::moveChunk( + opCtx, + chunkType, + to->getId(), + Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), + MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff), + true); + if (!moveStatus.isOK()) { + warning() << "couldn't move chunk " << redact(chunk->toString()) << " to shard " << *to + << " while sharding collection " << nss.ns() << causedBy(redact(moveStatus)); + } + } + + if (allSplits.empty()) { + return; + } + + // Reload the config info, after all the migrations + catalogCache->invalidateShardedCollection(nss); + routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + uassert(ErrorCodes::ConflictingOperationInProgress, + "Collection was successfully written as sharded but got dropped before it " + "could be evenly distributed", + routingInfo.cm()); + chunkManager = routingInfo.cm(); + + // Subdivide the big chunks by splitting at each of the points in "allSplits" + // that we haven't already split by. + auto currentChunk = chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[0]); + + std::vector<BSONObj> subSplits; + for (unsigned i = 0; i <= allSplits.size(); i++) { + if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) { + if (!subSplits.empty()) { + auto splitStatus = shardutil::splitChunkAtMultiplePoints( + opCtx, + currentChunk->getShardId(), + nss, + chunkManager->getShardKeyPattern(), + chunkManager->getVersion(), + ChunkRange(currentChunk->getMin(), currentChunk->getMax()), + subSplits); + if (!splitStatus.isOK()) { + warning() << "couldn't split chunk " << redact(currentChunk->toString()) + << " while sharding collection " << nss.ns() + << causedBy(redact(splitStatus.getStatus())); + } + + subSplits.clear(); + } + + if (i < allSplits.size()) { + currentChunk = chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[i]); + } + } else { + BSONObj splitPoint(allSplits[i]); + + // Do not split on the boundaries + if (currentChunk->getMin().woCompare(splitPoint) == 0) { + continue; + } + + subSplits.push_back(splitPoint); + } + } +} + +/** * Internal sharding command run on config servers to add a shard to the cluster. */ class ConfigSvrShardCollectionCommand : public BasicCommand { @@ -170,561 +678,101 @@ public: serverGlobalParams.clusterRole == ClusterRole::ConfigServer); const NamespaceString nss(parseNs(dbname, cmdObj)); - auto shardCollRequest = ConfigsvrShardCollection::parse( + auto request = ConfigsvrShardCollection::parse( IDLParserErrorContext("ConfigsvrShardCollection"), cmdObj); - auto const catalogClient = Grid::get(opCtx)->catalogClient(); auto const catalogManager = ShardingCatalogManager::get(opCtx); - auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); auto const catalogCache = Grid::get(opCtx)->catalogCache(); - // Lock the collection to prevent older mongos instances from trying to shard or drop it at - // the same time + // Lock the collection to prevent older mongos instances from trying to shard or drop it + // concurrently. boost::optional<DistLockManager::ScopedDistLock> scopedDistLock( - uassertStatusOK(catalogClient->getDistLockManager()->lock( + uassertStatusOK(Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( opCtx, nss.ns(), "shardCollection", DistLockManager::kDefaultLockTimeout))); // Ensure sharding is allowed on the database - auto dbType = uassertStatusOK(catalogClient->getDatabase(opCtx, nss.db().toString())).value; + auto dbType = uassertStatusOK(Grid::get(opCtx)->catalogClient()->getDatabase( + opCtx, nss.db().toString())) + .value; uassert(ErrorCodes::IllegalOperation, str::stream() << "sharding not enabled for db " << nss.db(), dbType.getSharded()); - // Ensure that the collection is not already sharded with different options. - - // We must reload the collection while metadata commands are split between mongos and the - // config servers. - catalogCache->invalidateShardedCollection(nss); - auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - - if (shardCollRequest.getKey().isEmpty()) { - return appendCommandStatus( - result, - {ErrorCodes::InvalidOptions, str::stream() << "Cannot have an empty shard key"}); - } + // Get variables required throughout this command. - auto proposedKey(shardCollRequest.getKey().getOwned()); + auto proposedKey(request.getKey().getOwned()); ShardKeyPattern shardKeyPattern(proposedKey); - if (!shardKeyPattern.isValid()) { - return appendCommandStatus(result, - {ErrorCodes::InvalidOptions, - str::stream() << "Unsupported shard key pattern " - << shardKeyPattern.toString() - << ". Pattern must either be a single hashed " - "field, or a list of ascending fields"}); - } - - bool isHashedShardKey = shardKeyPattern.isHashedPattern(); - bool careAboutUnique = shardCollRequest.getUnique(); - - if (isHashedShardKey && careAboutUnique) { - dassert(proposedKey.nFields() == 1); - return appendCommandStatus(result, - {ErrorCodes::InvalidOptions, - "Hashed shard keys cannot be declared unique. It's " - "possible to ensure uniqueness on the hashed field by " - "declaring an additional (non-hashed) unique index on the " - "field."}); - } - - uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem()); - - // Ensure that the collation is valid. Currently we only allow the simple collation. - bool simpleCollationSpecified = false; - if (shardCollRequest.getCollation()) { - auto& collation = *shardCollRequest.getCollation(); - auto collator = - CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation); - if (!collator.getStatus().isOK()) { - return appendCommandStatus(result, collator.getStatus()); - } - if (collator.getValue()) { - return appendCommandStatus( - result, - {ErrorCodes::BadValue, - str::stream() - << "The collation for shardCollection must be {locale: 'simple'}, " - << "but found: " - << collation}); - } - simpleCollationSpecified = true; - } std::vector<ShardId> shardIds; - shardRegistry->getAllShardIds(&shardIds); - + Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds); const int numShards = shardIds.size(); - // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000 - // chunks in total to limit the amount of memory this command consumes so there is less - // danger of an OOM error. - const int maxNumInitialChunksForShards = numShards * 8192; - const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption - int numChunks = shardCollRequest.getNumInitialChunks(); - if (numChunks > maxNumInitialChunksForShards || numChunks > maxNumInitialChunksTotal) { - return appendCommandStatus(result, - {ErrorCodes::InvalidOptions, - str::stream() - << "numInitialChunks cannot be more than either: " - << maxNumInitialChunksForShards - << ", 8192 * number of shards; or " - << maxNumInitialChunksTotal}); - } - - // The rest of the checks require a connection to the primary db - auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbType.getPrimary())); + auto primaryShard = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbType.getPrimary())); ScopedDbConnection conn(primaryShard->getConnString()); ON_BLOCK_EXIT([&conn] { conn.done(); }); - // Retrieve the collection metadata in order to verify that it is legal to shard this - // collection. - BSONObj res; - { - std::list<BSONObj> all = - conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - if (!all.empty()) { - res = all.front().getOwned(); - } - } - - BSONObj defaultCollation; - - if (!res.isEmpty()) { - // Check that namespace is not a view. - { - std::string namespaceType; - auto status = bsonExtractStringField(res, "type", &namespaceType); - if (!status.isOK()) { - return appendCommandStatus(result, status); - } - - if (namespaceType == "view") { - return appendCommandStatus( - result, - {ErrorCodes::CommandNotSupportedOnView, "Views cannot be sharded."}); - } - } - - BSONObj collectionOptions; - if (res["options"].type() == BSONType::Object) { - collectionOptions = res["options"].Obj(); - } - - // Check that collection is not capped. - if (collectionOptions["capped"].trueValue()) { - return appendCommandStatus( - result, {ErrorCodes::InvalidOptions, "can't shard a capped collection"}); - } - - // Get collection default collation. - { - BSONElement collationElement; - auto status = bsonExtractTypedField( - collectionOptions, "collation", BSONType::Object, &collationElement); - if (status.isOK()) { - defaultCollation = collationElement.Obj().getOwned(); - if (defaultCollation.isEmpty()) { - return appendCommandStatus( - result, - {ErrorCodes::BadValue, - "Default collation in collection metadata cannot be empty."}); - } - } else if (status != ErrorCodes::NoSuchKey) { - return appendCommandStatus( - result, - {status.code(), - str::stream() - << "Could not parse default collation in collection metadata " - << causedBy(status)}); - } - } + // Step 1. + validateAndDeduceFullRequestOptions(opCtx, nss, shardKeyPattern, numShards, conn, &request); - // If the collection has a non-simple default collation but the user did not specify the - // simple collation explicitly, return an error. - if (!defaultCollation.isEmpty() && !simpleCollationSpecified) { - return appendCommandStatus(result, - {ErrorCodes::BadValue, - str::stream() - << "Collection has default collation: " - << collectionOptions["collation"] - << ". Must specify collation {locale: 'simple'}"}); - } - } + // The collation option should have been set to the collection default collation after being + // validated. + invariant(request.getCollation()); - // If the collection is already sharded, fail if the deduced options in this request do not - // match the options the collection was originally sharded with. - if (routingInfo.cm()) { - auto existingColl = - uassertStatusOK(catalogClient->getCollection(opCtx, nss.ns())).value; - - CollectionType requestedOptions; - requestedOptions.setNs(nss); - requestedOptions.setKeyPattern(KeyPattern(proposedKey)); - requestedOptions.setDefaultCollation(defaultCollation); - requestedOptions.setUnique(careAboutUnique); - - uassert(ErrorCodes::AlreadyInitialized, - str::stream() << "sharding already enabled for collection " << nss.ns() - << " with options " - << existingColl.toString(), - requestedOptions.hasSameOptions(existingColl)); - - // If the options do match, we can immediately return success. + // Step 2. + if (checkIfAlreadyShardedWithSameOptions(opCtx, nss, request)) { return true; } - // The proposed shard key must be validated against the set of existing indexes. - // In particular, we must ensure the following constraints - // - // 1. All existing unique indexes, except those which start with the _id index, - // must contain the proposed key as a prefix (uniqueness of the _id index is - // ensured by the _id generation process or guaranteed by the user). - // - // 2. If the collection is not empty, there must exist at least one index that - // is "useful" for the proposed key. A "useful" index is defined as follows - // Useful Index: - // i. contains proposedKey as a prefix - // ii. is not a sparse index, partial index, or index with a non-simple collation - // iii. contains no null values - // iv. is not multikey (maybe lift this restriction later) - // v. if a hashed index, has default seed (lift this restriction later) - // - // 3. If the proposed shard key is specified as unique, there must exist a useful, - // unique index exactly equal to the proposedKey (not just a prefix). - // - // After validating these constraint: - // - // 4. If there is no useful index, and the collection is non-empty, we - // must fail. - // - // 5. If the collection is empty, and it's still possible to create an index - // on the proposed key, we go ahead and do so. - std::list<BSONObj> indexes = conn->getIndexSpecs(nss.ns()); - - // 1. Verify consistency with existing unique indexes - - for (const auto& idx : indexes) { - BSONObj currentKey = idx["key"].embeddedObject(); - bool isUnique = idx["unique"].trueValue(); - if (isUnique && !shardKeyPattern.isUniqueIndexCompatible(currentKey)) { - return appendCommandStatus( - result, - {ErrorCodes::InvalidOptions, - str::stream() - << "can't shard collection '" - << nss.ns() - << "' with unique index on " - << currentKey - << " and proposed shard key " - << proposedKey - << ". Uniqueness can't be maintained unless shard key is a prefix"}); - } - } - - // 2. Check for a useful index - bool hasUsefulIndexForKey = false; - for (const auto& idx : indexes) { - BSONObj currentKey = idx["key"].embeddedObject(); - // Check 2.i. and 2.ii. - if (!idx["sparse"].trueValue() && idx["filter"].eoo() && idx["collation"].eoo() && - proposedKey.isPrefixOf(currentKey, SimpleBSONElementComparator::kInstance)) { - // We can't currently use hashed indexes with a non-default hash seed - // Check v. - // Note that this means that, for sharding, we only support one hashed index - // per field per collection. - if (isHashedShardKey && !idx["seed"].eoo() && - idx["seed"].numberInt() != BSONElementHasher::DEFAULT_HASH_SEED) { - return appendCommandStatus( - result, - {ErrorCodes::InvalidOptions, - str::stream() << "can't shard collection " << nss.ns() - << " with hashed shard key " - << proposedKey - << " because the hashed index uses a non-default seed of " - << idx["seed"].numberInt()}); - } - hasUsefulIndexForKey = true; - } - } - - // 3. If proposed key is required to be unique, additionally check for exact match. - - if (hasUsefulIndexForKey && careAboutUnique) { - BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey); - BSONObj eqQueryResult; - - for (const auto& idx : indexes) { - if (SimpleBSONObjComparator::kInstance.evaluate(idx["key"].embeddedObject() == - proposedKey)) { - eqQueryResult = idx; - break; - } - } - - if (eqQueryResult.isEmpty()) { - // If no exact match, index not useful, but still possible to create one later - hasUsefulIndexForKey = false; - } else { - bool isExplicitlyUnique = eqQueryResult["unique"].trueValue(); - BSONObj currKey = eqQueryResult["key"].embeddedObject(); - bool isCurrentID = str::equals(currKey.firstElementFieldName(), "_id"); - - if (!isExplicitlyUnique && !isCurrentID) { - return appendCommandStatus( - result, - {ErrorCodes::InvalidOptions, - str::stream() - << "can't shard collection " - << nss.ns() - << ", " - << proposedKey - << " index not unique, and unique index explicitly specified"}); - } - } - } - - if (hasUsefulIndexForKey) { - // Check 2.iii and 2.iv. Make sure no null entries in the sharding index - // and that there is a useful, non-multikey index available - BSONObjBuilder checkShardingIndexCmd; - checkShardingIndexCmd.append("checkShardingIndex", nss.ns()); - checkShardingIndexCmd.append("keyPattern", proposedKey); - - if (!conn.get()->runCommand("admin", checkShardingIndexCmd.obj(), res)) { - return appendCommandStatus(result, - {ErrorCodes::OperationFailed, res["errmsg"].str()}); - } - } else if (conn->count(nss.ns()) != 0) { - // 4. if no useful index, and collection is non-empty, fail - return appendCommandStatus( - result, - {ErrorCodes::InvalidOptions, - "Please create an index that starts with the proposed shard key before " - "sharding the collection"}); - } else { - // 5. If no useful index exists, and collection empty, create one on proposedKey. - // Only need to call ensureIndex on primary shard, since indexes get copied to - // receiving shard whenever a migrate occurs. - // If the collection has a default collation, explicitly send the simple - // collation as part of the createIndex request. - BSONObj collation = - !defaultCollation.isEmpty() ? CollationSpec::kSimpleSpec : BSONObj(); - auto createIndexesCmd = - makeCreateIndexesCmd(nss, proposedKey, collation, careAboutUnique); - - const auto swResponse = primaryShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting(ReadPreference::PrimaryOnly), - nss.db().toString(), - createIndexesCmd, - Shard::RetryPolicy::kNotIdempotent); - auto createIndexesStatus = swResponse.getStatus(); - if (createIndexesStatus.isOK()) { - const auto response = swResponse.getValue(); - createIndexesStatus = (!response.commandStatus.isOK()) - ? response.commandStatus - : response.writeConcernStatus; - } - uassertStatusOK(createIndexesStatus); - } + // Step 3. + validateShardKeyAgainstExistingIndexes( + opCtx, nss, proposedKey, shardKeyPattern, primaryShard, conn, request); + // isEmpty is used by multiple steps below. bool isEmpty = (conn->count(nss.ns()) == 0); - // Pre-splitting: - // For new collections which use hashed shard keys, we can can pre-split the - // range of possible hashes into a large number of chunks, and distribute them - // evenly at creation time. Until we design a better initialization scheme, the - // safest way to pre-split is to - // 1. make one big chunk for each shard - // 2. move them one at a time - // 3. split the big chunks to achieve the desired total number of initial chunks - + // Step 4. std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these std::vector<BSONObj> allSplits; // all of the initial desired split points - - if (shardCollRequest.getInitialSplitPoints()) { - initSplits = std::move(*shardCollRequest.getInitialSplitPoints()); - } else if (isHashedShardKey && isEmpty) { - // If initial split points are not specified, only pre-split when using a hashed shard - // key and the collection is empty - - if (numChunks <= 0) { - // default number of initial chunks - numChunks = 2 * numShards; - } - - // hashes are signed, 64-bit ints. So we divide the range (-MIN long, +MAX long) - // into intervals of size (2^64/numChunks) and create split points at the - // boundaries. The logic below ensures that initial chunks are all - // symmetric around 0. - long long intervalSize = (std::numeric_limits<long long>::max() / numChunks) * 2; - long long current = 0; - - if (numChunks % 2 == 0) { - allSplits.push_back(BSON(proposedKey.firstElementFieldName() << current)); - current += intervalSize; - } else { - current += intervalSize / 2; - } - - for (int i = 0; i < (numChunks - 1) / 2; i++) { - allSplits.push_back(BSON(proposedKey.firstElementFieldName() << current)); - allSplits.push_back(BSON(proposedKey.firstElementFieldName() << -current)); - current += intervalSize; - } - - sort(allSplits.begin(), - allSplits.end(), - SimpleBSONObjComparator::kInstance.makeLessThan()); - - // 1. the initial splits define the "big chunks" that we will subdivide later - int lastIndex = -1; - for (int i = 1; i < numShards; i++) { - if (lastIndex < (i * numChunks) / numShards - 1) { - lastIndex = (i * numChunks) / numShards - 1; - initSplits.push_back(allSplits[lastIndex]); - } - } - } else if (numChunks > 0) { - return appendCommandStatus( - result, - {ErrorCodes::InvalidOptions, - str::stream() << (!isHashedShardKey ? "numInitialChunks is not supported " - "when the shard key is not hashed." - : "numInitialChunks is not supported " - "when the collection is not empty.")}); - } + determinePresplittingPoints(opCtx, + numShards, + isEmpty, + proposedKey, + shardKeyPattern, + request, + &initSplits, + &allSplits); LOG(0) << "CMD: shardcollection: " << cmdObj; - audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, careAboutUnique); + audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, request.getUnique()); // The initial chunks are distributed evenly across shards only if the initial split points // were specified in the request, i.e., by mapReduce. Otherwise, all the initial chunks are // placed on the primary shard, and may be distributed across shards through migrations // (below) if using a hashed shard key. - const bool distributeInitialChunks = - shardCollRequest.getInitialSplitPoints().is_initialized(); + const bool distributeInitialChunks = request.getInitialSplitPoints().is_initialized(); + // Step 5. Actually shard the collection. catalogManager->shardCollection(opCtx, nss.ns(), shardKeyPattern, - defaultCollation, - careAboutUnique, + *request.getCollation(), + request.getUnique(), initSplits, distributeInitialChunks); - - // Free the collection dist lock in order to allow the initial splits and moves below to - // proceed - scopedDistLock.reset(); - result << "collectionsharded" << nss.ns(); // Make sure the cached metadata for the collection knows that we are now sharded catalogCache->invalidateShardedCollection(nss); - // Only initially move chunks when using a hashed shard key - if (isHashedShardKey && isEmpty) { - routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - uassert(ErrorCodes::ConflictingOperationInProgress, - "Collection was successfully written as sharded but got dropped before it " - "could be evenly distributed", - routingInfo.cm()); - - auto chunkManager = routingInfo.cm(); - - // 2. Move and commit each "big chunk" to a different shard. - int i = 0; - for (auto chunk : chunkManager->chunks()) { - const ShardId& shardId = shardIds[i++ % numShards]; - const auto toStatus = shardRegistry->getShard(opCtx, shardId); - if (!toStatus.isOK()) { - continue; - } - const auto to = toStatus.getValue(); - - // Can't move chunk to shard it's already on - if (to->getId() == chunk->getShardId()) { - continue; - } - - ChunkType chunkType; - chunkType.setNS(nss.ns()); - chunkType.setMin(chunk->getMin()); - chunkType.setMax(chunk->getMax()); - chunkType.setShard(chunk->getShardId()); - chunkType.setVersion(chunkManager->getVersion()); - - Status moveStatus = configsvr_client::moveChunk( - opCtx, - chunkType, - to->getId(), - Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), - MigrationSecondaryThrottleOptions::create( - MigrationSecondaryThrottleOptions::kOff), - true); - if (!moveStatus.isOK()) { - warning() << "couldn't move chunk " << redact(chunk->toString()) << " to shard " - << *to << " while sharding collection " << nss.ns() - << causedBy(redact(moveStatus)); - } - } - - if (allSplits.empty()) { - return true; - } + // Free the collection dist lock in order to allow the initial splits and moves below to + // proceed. + scopedDistLock.reset(); - // Reload the config info, after all the migrations - catalogCache->invalidateShardedCollection(nss); - routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); - uassert(ErrorCodes::ConflictingOperationInProgress, - "Collection was successfully written as sharded but got dropped before it " - "could be evenly distributed", - routingInfo.cm()); - chunkManager = routingInfo.cm(); - - // 3. Subdivide the big chunks by splitting at each of the points in "allSplits" - // that we haven't already split by. - auto currentChunk = - chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[0]); - - std::vector<BSONObj> subSplits; - for (unsigned i = 0; i <= allSplits.size(); i++) { - if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) { - if (!subSplits.empty()) { - auto splitStatus = shardutil::splitChunkAtMultiplePoints( - opCtx, - currentChunk->getShardId(), - nss, - chunkManager->getShardKeyPattern(), - chunkManager->getVersion(), - ChunkRange(currentChunk->getMin(), currentChunk->getMax()), - subSplits); - if (!splitStatus.isOK()) { - warning() << "couldn't split chunk " << redact(currentChunk->toString()) - << " while sharding collection " << nss.ns() - << causedBy(redact(splitStatus.getStatus())); - } - - subSplits.clear(); - } - - if (i < allSplits.size()) { - currentChunk = - chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[i]); - } - } else { - BSONObj splitPoint(allSplits[i]); - - // Do not split on the boundaries - if (currentChunk->getMin().woCompare(splitPoint) == 0) { - continue; - } - - subSplits.push_back(splitPoint); - } - } - } + // Step 6. Migrate initial chunks to distribute them across shards. + migrateAndFurtherSplitInitialChunks( + opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, allSplits); return true; } |