summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2017-07-24 16:25:24 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2017-07-31 18:12:03 -0400
commit6300b3bd4ad9cd238a02bdb8ca681a447913f1af (patch)
treef23f74c7729c6177b63f34261f473cec79fcddae
parent6b34ad314773be3f3214d6f4186fde6ec0c12e39 (diff)
downloadmongo-6300b3bd4ad9cd238a02bdb8ca681a447913f1af.tar.gz
SERVER-30292 split mind-bogglingly humongous shardCollection command body into a series of helper functions
-rw-r--r--src/mongo/db/s/config/configsvr_shard_collection_command.cpp1054
1 files changed, 551 insertions, 503 deletions
diff --git a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
index e2fc9203c51..65112eec4ff 100644
--- a/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
+++ b/src/mongo/db/s/config/configsvr_shard_collection_command.cpp
@@ -122,6 +122,514 @@ BSONObj makeCreateIndexesCmd(const NamespaceString& nss,
}
/**
+ * Validates the options specified in the request.
+ *
+ * WARNING: After validating the request's collation, replaces it with the collection default
+ * collation.
+ */
+void validateAndDeduceFullRequestOptions(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardKeyPattern& shardKeyPattern,
+ int numShards,
+ ScopedDbConnection& conn,
+ ConfigsvrShardCollection* request) {
+ uassert(
+ ErrorCodes::InvalidOptions, "cannot have empty shard key", !request->getKey().isEmpty());
+
+ // Ensure the proposed shard key is valid.
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream()
+ << "Unsupported shard key pattern "
+ << shardKeyPattern.toString()
+ << ". Pattern must either be a single hashed field, or a list of ascending fields",
+ shardKeyPattern.isValid());
+
+ // Ensure that hashed and unique are not both set.
+ uassert(ErrorCodes::InvalidOptions,
+ "Hashed shard keys cannot be declared unique. It's possible to ensure uniqueness on "
+ "the hashed field by declaring an additional (non-hashed) unique index on the field.",
+ !shardKeyPattern.isHashedPattern() || !request->getUnique());
+
+ // Ensure the namespace is valid.
+ uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem());
+
+ // Ensure the collation is valid. Currently we only allow the simple collation.
+ bool simpleCollationSpecified = false;
+ if (request->getCollation()) {
+ auto& collation = *request->getCollation();
+ auto collator = uassertStatusOK(
+ CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation));
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "The collation for shardCollection must be {locale: 'simple'}, "
+ << "but found: "
+ << collation,
+ !collator);
+ simpleCollationSpecified = true;
+ }
+
+ // Ensure numInitialChunks is within valid bounds.
+ // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000
+ // chunks in total to limit the amount of memory this command consumes so there is less
+ // danger of an OOM error.
+ const int maxNumInitialChunksForShards = numShards * 8192;
+ const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption
+ int numChunks = request->getNumInitialChunks();
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "numInitialChunks cannot be more than either: "
+ << maxNumInitialChunksForShards
+ << ", 8192 * number of shards; or "
+ << maxNumInitialChunksTotal,
+ numChunks <= maxNumInitialChunksForShards && numChunks <= maxNumInitialChunksTotal);
+
+ // Retrieve the collection metadata in order to verify that it is legal to shard this
+ // collection.
+ BSONObj res;
+ {
+ std::list<BSONObj> all =
+ conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
+ if (!all.empty()) {
+ res = all.front().getOwned();
+ }
+ }
+
+ BSONObj defaultCollation;
+
+ if (!res.isEmpty()) {
+ // Check that namespace is not a view.
+ {
+ std::string namespaceType;
+ uassertStatusOK(bsonExtractStringField(res, "type", &namespaceType));
+ uassert(ErrorCodes::CommandNotSupportedOnView,
+ "Views cannot be sharded.",
+ namespaceType != "view");
+ }
+
+ BSONObj collectionOptions;
+ if (res["options"].type() == BSONType::Object) {
+ collectionOptions = res["options"].Obj();
+ }
+
+ // Check that collection is not capped.
+ uassert(ErrorCodes::InvalidOptions,
+ "can't shard a capped collection",
+ !collectionOptions["capped"].trueValue());
+
+ // Get collection default collation.
+ BSONElement collationElement;
+ auto status = bsonExtractTypedField(
+ collectionOptions, "collation", BSONType::Object, &collationElement);
+ if (status.isOK()) {
+ defaultCollation = collationElement.Obj().getOwned();
+ uassert(ErrorCodes::BadValue,
+ "Default collation in collection metadata cannot be empty.",
+ !defaultCollation.isEmpty());
+ } else if (status != ErrorCodes::NoSuchKey) {
+ uassertStatusOK(status);
+ }
+
+ // If the collection has a non-simple default collation but the user did not specify the
+ // simple collation explicitly, return an error.
+ uassert(ErrorCodes::BadValue,
+ str::stream() << "Collection has default collation: "
+ << collectionOptions["collation"]
+ << ". Must specify collation {locale: 'simple'}",
+ defaultCollation.isEmpty() || simpleCollationSpecified);
+ }
+
+ // Once the request's collation has been validated as simple or unset, replace it with the
+ // deduced collection default collation.
+ request->setCollation(defaultCollation.getOwned());
+}
+
+/**
+ * Throws an exception if the collection is already sharded with different options.
+ *
+ * Returns true if the collection is already sharded with the same options, and false if the
+ * collection is not sharded.
+ */
+bool checkIfAlreadyShardedWithSameOptions(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ConfigsvrShardCollection& request) {
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+
+ // We must reload the collection while metadata commands are split between mongos and the
+ // config servers.
+ catalogCache->invalidateShardedCollection(nss);
+ auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+
+ // If the collection is already sharded, fail if the deduced options in this request do not
+ // match the options the collection was originally sharded with.
+ if (routingInfo.cm()) {
+ auto existingColl =
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss.ns()))
+ .value;
+
+ CollectionType requestedOptions;
+ requestedOptions.setNs(nss);
+ requestedOptions.setKeyPattern(KeyPattern(request.getKey()));
+ requestedOptions.setDefaultCollation(*request.getCollation());
+ requestedOptions.setUnique(request.getUnique());
+
+ uassert(ErrorCodes::AlreadyInitialized,
+ str::stream() << "sharding already enabled for collection " << nss.ns()
+ << " with options "
+ << existingColl.toString(),
+ requestedOptions.hasSameOptions(existingColl));
+
+ // If the options do match, we can immediately return success.
+ return true;
+ }
+
+ // Not currently sharded.
+ return false;
+}
+
+/**
+ * Compares the proposed shard key with the collection's existing indexes on the primary shard to
+ * ensure they are a legal combination.
+ *
+ * If the collection is empty and no index on the shard key exists, creates the required index.
+ */
+void validateShardKeyAgainstExistingIndexes(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& proposedKey,
+ const ShardKeyPattern& shardKeyPattern,
+ const std::shared_ptr<Shard> primaryShard,
+ ScopedDbConnection& conn,
+ const ConfigsvrShardCollection& request) {
+ // The proposed shard key must be validated against the set of existing indexes.
+ // In particular, we must ensure the following constraints
+ //
+ // 1. All existing unique indexes, except those which start with the _id index,
+ // must contain the proposed key as a prefix (uniqueness of the _id index is
+ // ensured by the _id generation process or guaranteed by the user).
+ //
+ // 2. If the collection is not empty, there must exist at least one index that
+ // is "useful" for the proposed key. A "useful" index is defined as follows
+ // Useful Index:
+ // i. contains proposedKey as a prefix
+ // ii. is not a sparse index, partial index, or index with a non-simple collation
+ // iii. contains no null values
+ // iv. is not multikey (maybe lift this restriction later)
+ // v. if a hashed index, has default seed (lift this restriction later)
+ //
+ // 3. If the proposed shard key is specified as unique, there must exist a useful,
+ // unique index exactly equal to the proposedKey (not just a prefix).
+ //
+ // After validating these constraint:
+ //
+ // 4. If there is no useful index, and the collection is non-empty, we
+ // must fail.
+ //
+ // 5. If the collection is empty, and it's still possible to create an index
+ // on the proposed key, we go ahead and do so.
+
+ std::list<BSONObj> indexes = conn->getIndexSpecs(nss.ns());
+
+ // 1. Verify consistency with existing unique indexes
+ for (const auto& idx : indexes) {
+ BSONObj currentKey = idx["key"].embeddedObject();
+ bool isUnique = idx["unique"].trueValue();
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "can't shard collection '" << nss.ns() << "' with unique index on "
+ << currentKey
+ << " and proposed shard key "
+ << proposedKey
+ << ". Uniqueness can't be maintained unless shard key is a prefix",
+ !isUnique || shardKeyPattern.isUniqueIndexCompatible(currentKey));
+ }
+
+ // 2. Check for a useful index
+ bool hasUsefulIndexForKey = false;
+ for (const auto& idx : indexes) {
+ BSONObj currentKey = idx["key"].embeddedObject();
+ // Check 2.i. and 2.ii.
+ if (!idx["sparse"].trueValue() && idx["filter"].eoo() && idx["collation"].eoo() &&
+ proposedKey.isPrefixOf(currentKey, SimpleBSONElementComparator::kInstance)) {
+ // We can't currently use hashed indexes with a non-default hash seed
+ // Check v.
+ // Note that this means that, for sharding, we only support one hashed index
+ // per field per collection.
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "can't shard collection " << nss.ns()
+ << " with hashed shard key "
+ << proposedKey
+ << " because the hashed index uses a non-default seed of "
+ << idx["seed"].numberInt(),
+ !shardKeyPattern.isHashedPattern() || idx["seed"].eoo() ||
+ idx["seed"].numberInt() == BSONElementHasher::DEFAULT_HASH_SEED);
+ hasUsefulIndexForKey = true;
+ }
+ }
+
+ // 3. If proposed key is required to be unique, additionally check for exact match.
+
+ if (hasUsefulIndexForKey && request.getUnique()) {
+ BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey);
+ BSONObj eqQueryResult;
+
+ for (const auto& idx : indexes) {
+ if (SimpleBSONObjComparator::kInstance.evaluate(idx["key"].embeddedObject() ==
+ proposedKey)) {
+ eqQueryResult = idx;
+ break;
+ }
+ }
+
+ if (eqQueryResult.isEmpty()) {
+ // If no exact match, index not useful, but still possible to create one later
+ hasUsefulIndexForKey = false;
+ } else {
+ bool isExplicitlyUnique = eqQueryResult["unique"].trueValue();
+ BSONObj currKey = eqQueryResult["key"].embeddedObject();
+ bool isCurrentID = str::equals(currKey.firstElementFieldName(), "_id");
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "can't shard collection " << nss.ns() << ", " << proposedKey
+ << " index not unique, and unique index explicitly specified",
+ isExplicitlyUnique || isCurrentID);
+ }
+ }
+
+ if (hasUsefulIndexForKey) {
+ // Check 2.iii and 2.iv. Make sure no null entries in the sharding index
+ // and that there is a useful, non-multikey index available
+ BSONObjBuilder checkShardingIndexCmd;
+ checkShardingIndexCmd.append("checkShardingIndex", nss.ns());
+ checkShardingIndexCmd.append("keyPattern", proposedKey);
+ BSONObj res;
+ auto success = conn.get()->runCommand("admin", checkShardingIndexCmd.obj(), res);
+ uassert(ErrorCodes::OperationFailed, res["errmsg"].str(), success);
+ } else if (conn->count(nss.ns()) != 0) {
+ // 4. if no useful index, and collection is non-empty, fail
+ uasserted(ErrorCodes::InvalidOptions,
+ "Please create an index that starts with the proposed shard key before "
+ "sharding the collection");
+ } else {
+ // 5. If no useful index exists, and collection empty, create one on proposedKey.
+ // Only need to call ensureIndex on primary shard, since indexes get copied to
+ // receiving shard whenever a migrate occurs.
+ // If the collection has a default collation, explicitly send the simple
+ // collation as part of the createIndex request.
+ BSONObj collation =
+ !request.getCollation()->isEmpty() ? CollationSpec::kSimpleSpec : BSONObj();
+ auto createIndexesCmd =
+ makeCreateIndexesCmd(nss, proposedKey, collation, request.getUnique());
+
+ const auto swResponse = primaryShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ nss.db().toString(),
+ createIndexesCmd,
+ Shard::RetryPolicy::kNotIdempotent);
+ auto createIndexesStatus = swResponse.getStatus();
+ if (createIndexesStatus.isOK()) {
+ const auto response = swResponse.getValue();
+ createIndexesStatus = (!response.commandStatus.isOK()) ? response.commandStatus
+ : response.writeConcernStatus;
+ }
+ uassertStatusOK(createIndexesStatus);
+ }
+}
+
+/**
+ * For new collections which use hashed shard keys, we can can pre-split the range of possible
+ * hashes into a large number of chunks, and distribute them evenly at creation time. Until we
+ * design a better initialization scheme, the safest way to pre-split is to make one big chunk for
+ * each shard and migrate them one at a time.
+ *
+ * Populates 'initSplits' with the split points to use on the primary shard to produce the initial
+ * "big chunks."
+ * Also populates 'allSplits' with the additional split points to use on the "big chunks" after the
+ * "big chunks" have been spread evenly across shards through migrations.
+ */
+void determinePresplittingPoints(OperationContext* opCtx,
+ int numShards,
+ bool isEmpty,
+ const BSONObj& proposedKey,
+ const ShardKeyPattern& shardKeyPattern,
+ const ConfigsvrShardCollection& request,
+ std::vector<BSONObj>* initSplits,
+ std::vector<BSONObj>* allSplits) {
+ auto numChunks = request.getNumInitialChunks();
+
+ if (request.getInitialSplitPoints()) {
+ *initSplits = std::move(*request.getInitialSplitPoints());
+ return;
+ }
+
+ if (shardKeyPattern.isHashedPattern() && isEmpty) {
+ // If initial split points are not specified, only pre-split when using a hashed shard
+ // key and the collection is empty
+ if (numChunks <= 0) {
+ // default number of initial chunks
+ numChunks = 2 * numShards;
+ }
+
+ // hashes are signed, 64-bit ints. So we divide the range (-MIN long, +MAX long)
+ // into intervals of size (2^64/numChunks) and create split points at the
+ // boundaries. The logic below ensures that initial chunks are all
+ // symmetric around 0.
+ long long intervalSize = (std::numeric_limits<long long>::max() / numChunks) * 2;
+ long long current = 0;
+
+ if (numChunks % 2 == 0) {
+ allSplits->push_back(BSON(proposedKey.firstElementFieldName() << current));
+ current += intervalSize;
+ } else {
+ current += intervalSize / 2;
+ }
+
+ for (int i = 0; i < (numChunks - 1) / 2; i++) {
+ allSplits->push_back(BSON(proposedKey.firstElementFieldName() << current));
+ allSplits->push_back(BSON(proposedKey.firstElementFieldName() << -current));
+ current += intervalSize;
+ }
+
+ sort(allSplits->begin(),
+ allSplits->end(),
+ SimpleBSONObjComparator::kInstance.makeLessThan());
+
+ // The initial splits define the "big chunks" that we will subdivide later.
+ int lastIndex = -1;
+ for (int i = 1; i < numShards; i++) {
+ if (lastIndex < (i * numChunks) / numShards - 1) {
+ lastIndex = (i * numChunks) / numShards - 1;
+ initSplits->push_back(allSplits->at(lastIndex));
+ }
+ }
+ } else if (numChunks > 0) {
+ uasserted(ErrorCodes::InvalidOptions,
+ str::stream() << (!shardKeyPattern.isHashedPattern()
+ ? "numInitialChunks is not supported "
+ "when the shard key is not hashed."
+ : "numInitialChunks is not supported "
+ "when the collection is not empty."));
+ }
+}
+
+/**
+ * Migrates the initial "big chunks" from the primary shard to spread them evenly across the shards.
+ *
+ * If 'allSplits' is not empty, additionally splits each "big chunk" into smaller chunks using the
+ * points in 'allSplits.'
+ */
+void migrateAndFurtherSplitInitialChunks(OperationContext* opCtx,
+ const NamespaceString& nss,
+ int numShards,
+ const std::vector<ShardId>& shardIds,
+ bool isEmpty,
+ const ShardKeyPattern& shardKeyPattern,
+ const std::vector<BSONObj>& allSplits) {
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+
+ if (!shardKeyPattern.isHashedPattern()) {
+ // Only initially move chunks when using a hashed shard key.
+ return;
+ }
+
+ if (!isEmpty) {
+ // If the collection is not empty, rely on the balancer to migrate the chunks.
+ return;
+ }
+
+ auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Collection was successfully written as sharded but got dropped before it "
+ "could be evenly distributed",
+ routingInfo.cm());
+
+ auto chunkManager = routingInfo.cm();
+
+ // Move and commit each "big chunk" to a different shard.
+ int i = 0;
+ for (auto chunk : chunkManager->chunks()) {
+ const ShardId& shardId = shardIds[i++ % numShards];
+ const auto toStatus = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId);
+ if (!toStatus.isOK()) {
+ continue;
+ }
+ const auto to = toStatus.getValue();
+
+ // Can't move chunk to shard it's already on
+ if (to->getId() == chunk->getShardId()) {
+ continue;
+ }
+
+ ChunkType chunkType;
+ chunkType.setNS(nss.ns());
+ chunkType.setMin(chunk->getMin());
+ chunkType.setMax(chunk->getMax());
+ chunkType.setShard(chunk->getShardId());
+ chunkType.setVersion(chunkManager->getVersion());
+
+ Status moveStatus = configsvr_client::moveChunk(
+ opCtx,
+ chunkType,
+ to->getId(),
+ Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
+ MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff),
+ true);
+ if (!moveStatus.isOK()) {
+ warning() << "couldn't move chunk " << redact(chunk->toString()) << " to shard " << *to
+ << " while sharding collection " << nss.ns() << causedBy(redact(moveStatus));
+ }
+ }
+
+ if (allSplits.empty()) {
+ return;
+ }
+
+ // Reload the config info, after all the migrations
+ catalogCache->invalidateShardedCollection(nss);
+ routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Collection was successfully written as sharded but got dropped before it "
+ "could be evenly distributed",
+ routingInfo.cm());
+ chunkManager = routingInfo.cm();
+
+ // Subdivide the big chunks by splitting at each of the points in "allSplits"
+ // that we haven't already split by.
+ auto currentChunk = chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[0]);
+
+ std::vector<BSONObj> subSplits;
+ for (unsigned i = 0; i <= allSplits.size(); i++) {
+ if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) {
+ if (!subSplits.empty()) {
+ auto splitStatus = shardutil::splitChunkAtMultiplePoints(
+ opCtx,
+ currentChunk->getShardId(),
+ nss,
+ chunkManager->getShardKeyPattern(),
+ chunkManager->getVersion(),
+ ChunkRange(currentChunk->getMin(), currentChunk->getMax()),
+ subSplits);
+ if (!splitStatus.isOK()) {
+ warning() << "couldn't split chunk " << redact(currentChunk->toString())
+ << " while sharding collection " << nss.ns()
+ << causedBy(redact(splitStatus.getStatus()));
+ }
+
+ subSplits.clear();
+ }
+
+ if (i < allSplits.size()) {
+ currentChunk = chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[i]);
+ }
+ } else {
+ BSONObj splitPoint(allSplits[i]);
+
+ // Do not split on the boundaries
+ if (currentChunk->getMin().woCompare(splitPoint) == 0) {
+ continue;
+ }
+
+ subSplits.push_back(splitPoint);
+ }
+ }
+}
+
+/**
* Internal sharding command run on config servers to add a shard to the cluster.
*/
class ConfigSvrShardCollectionCommand : public BasicCommand {
@@ -170,561 +678,101 @@ public:
serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
const NamespaceString nss(parseNs(dbname, cmdObj));
- auto shardCollRequest = ConfigsvrShardCollection::parse(
+ auto request = ConfigsvrShardCollection::parse(
IDLParserErrorContext("ConfigsvrShardCollection"), cmdObj);
- auto const catalogClient = Grid::get(opCtx)->catalogClient();
auto const catalogManager = ShardingCatalogManager::get(opCtx);
- auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
auto const catalogCache = Grid::get(opCtx)->catalogCache();
- // Lock the collection to prevent older mongos instances from trying to shard or drop it at
- // the same time
+ // Lock the collection to prevent older mongos instances from trying to shard or drop it
+ // concurrently.
boost::optional<DistLockManager::ScopedDistLock> scopedDistLock(
- uassertStatusOK(catalogClient->getDistLockManager()->lock(
+ uassertStatusOK(Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock(
opCtx, nss.ns(), "shardCollection", DistLockManager::kDefaultLockTimeout)));
// Ensure sharding is allowed on the database
- auto dbType = uassertStatusOK(catalogClient->getDatabase(opCtx, nss.db().toString())).value;
+ auto dbType = uassertStatusOK(Grid::get(opCtx)->catalogClient()->getDatabase(
+ opCtx, nss.db().toString()))
+ .value;
uassert(ErrorCodes::IllegalOperation,
str::stream() << "sharding not enabled for db " << nss.db(),
dbType.getSharded());
- // Ensure that the collection is not already sharded with different options.
-
- // We must reload the collection while metadata commands are split between mongos and the
- // config servers.
- catalogCache->invalidateShardedCollection(nss);
- auto routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
-
- if (shardCollRequest.getKey().isEmpty()) {
- return appendCommandStatus(
- result,
- {ErrorCodes::InvalidOptions, str::stream() << "Cannot have an empty shard key"});
- }
+ // Get variables required throughout this command.
- auto proposedKey(shardCollRequest.getKey().getOwned());
+ auto proposedKey(request.getKey().getOwned());
ShardKeyPattern shardKeyPattern(proposedKey);
- if (!shardKeyPattern.isValid()) {
- return appendCommandStatus(result,
- {ErrorCodes::InvalidOptions,
- str::stream() << "Unsupported shard key pattern "
- << shardKeyPattern.toString()
- << ". Pattern must either be a single hashed "
- "field, or a list of ascending fields"});
- }
-
- bool isHashedShardKey = shardKeyPattern.isHashedPattern();
- bool careAboutUnique = shardCollRequest.getUnique();
-
- if (isHashedShardKey && careAboutUnique) {
- dassert(proposedKey.nFields() == 1);
- return appendCommandStatus(result,
- {ErrorCodes::InvalidOptions,
- "Hashed shard keys cannot be declared unique. It's "
- "possible to ensure uniqueness on the hashed field by "
- "declaring an additional (non-hashed) unique index on the "
- "field."});
- }
-
- uassert(ErrorCodes::IllegalOperation, "can't shard system namespaces", !nss.isSystem());
-
- // Ensure that the collation is valid. Currently we only allow the simple collation.
- bool simpleCollationSpecified = false;
- if (shardCollRequest.getCollation()) {
- auto& collation = *shardCollRequest.getCollation();
- auto collator =
- CollatorFactoryInterface::get(opCtx->getServiceContext())->makeFromBSON(collation);
- if (!collator.getStatus().isOK()) {
- return appendCommandStatus(result, collator.getStatus());
- }
- if (collator.getValue()) {
- return appendCommandStatus(
- result,
- {ErrorCodes::BadValue,
- str::stream()
- << "The collation for shardCollection must be {locale: 'simple'}, "
- << "but found: "
- << collation});
- }
- simpleCollationSpecified = true;
- }
std::vector<ShardId> shardIds;
- shardRegistry->getAllShardIds(&shardIds);
-
+ Grid::get(opCtx)->shardRegistry()->getAllShardIds(&shardIds);
const int numShards = shardIds.size();
- // Cannot have more than 8192 initial chunks per shard. Setting a maximum of 1,000,000
- // chunks in total to limit the amount of memory this command consumes so there is less
- // danger of an OOM error.
- const int maxNumInitialChunksForShards = numShards * 8192;
- const int maxNumInitialChunksTotal = 1000 * 1000; // Arbitrary limit to memory consumption
- int numChunks = shardCollRequest.getNumInitialChunks();
- if (numChunks > maxNumInitialChunksForShards || numChunks > maxNumInitialChunksTotal) {
- return appendCommandStatus(result,
- {ErrorCodes::InvalidOptions,
- str::stream()
- << "numInitialChunks cannot be more than either: "
- << maxNumInitialChunksForShards
- << ", 8192 * number of shards; or "
- << maxNumInitialChunksTotal});
- }
-
- // The rest of the checks require a connection to the primary db
- auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbType.getPrimary()));
+ auto primaryShard = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, dbType.getPrimary()));
ScopedDbConnection conn(primaryShard->getConnString());
ON_BLOCK_EXIT([&conn] { conn.done(); });
- // Retrieve the collection metadata in order to verify that it is legal to shard this
- // collection.
- BSONObj res;
- {
- std::list<BSONObj> all =
- conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- if (!all.empty()) {
- res = all.front().getOwned();
- }
- }
-
- BSONObj defaultCollation;
-
- if (!res.isEmpty()) {
- // Check that namespace is not a view.
- {
- std::string namespaceType;
- auto status = bsonExtractStringField(res, "type", &namespaceType);
- if (!status.isOK()) {
- return appendCommandStatus(result, status);
- }
-
- if (namespaceType == "view") {
- return appendCommandStatus(
- result,
- {ErrorCodes::CommandNotSupportedOnView, "Views cannot be sharded."});
- }
- }
-
- BSONObj collectionOptions;
- if (res["options"].type() == BSONType::Object) {
- collectionOptions = res["options"].Obj();
- }
-
- // Check that collection is not capped.
- if (collectionOptions["capped"].trueValue()) {
- return appendCommandStatus(
- result, {ErrorCodes::InvalidOptions, "can't shard a capped collection"});
- }
-
- // Get collection default collation.
- {
- BSONElement collationElement;
- auto status = bsonExtractTypedField(
- collectionOptions, "collation", BSONType::Object, &collationElement);
- if (status.isOK()) {
- defaultCollation = collationElement.Obj().getOwned();
- if (defaultCollation.isEmpty()) {
- return appendCommandStatus(
- result,
- {ErrorCodes::BadValue,
- "Default collation in collection metadata cannot be empty."});
- }
- } else if (status != ErrorCodes::NoSuchKey) {
- return appendCommandStatus(
- result,
- {status.code(),
- str::stream()
- << "Could not parse default collation in collection metadata "
- << causedBy(status)});
- }
- }
+ // Step 1.
+ validateAndDeduceFullRequestOptions(opCtx, nss, shardKeyPattern, numShards, conn, &request);
- // If the collection has a non-simple default collation but the user did not specify the
- // simple collation explicitly, return an error.
- if (!defaultCollation.isEmpty() && !simpleCollationSpecified) {
- return appendCommandStatus(result,
- {ErrorCodes::BadValue,
- str::stream()
- << "Collection has default collation: "
- << collectionOptions["collation"]
- << ". Must specify collation {locale: 'simple'}"});
- }
- }
+ // The collation option should have been set to the collection default collation after being
+ // validated.
+ invariant(request.getCollation());
- // If the collection is already sharded, fail if the deduced options in this request do not
- // match the options the collection was originally sharded with.
- if (routingInfo.cm()) {
- auto existingColl =
- uassertStatusOK(catalogClient->getCollection(opCtx, nss.ns())).value;
-
- CollectionType requestedOptions;
- requestedOptions.setNs(nss);
- requestedOptions.setKeyPattern(KeyPattern(proposedKey));
- requestedOptions.setDefaultCollation(defaultCollation);
- requestedOptions.setUnique(careAboutUnique);
-
- uassert(ErrorCodes::AlreadyInitialized,
- str::stream() << "sharding already enabled for collection " << nss.ns()
- << " with options "
- << existingColl.toString(),
- requestedOptions.hasSameOptions(existingColl));
-
- // If the options do match, we can immediately return success.
+ // Step 2.
+ if (checkIfAlreadyShardedWithSameOptions(opCtx, nss, request)) {
return true;
}
- // The proposed shard key must be validated against the set of existing indexes.
- // In particular, we must ensure the following constraints
- //
- // 1. All existing unique indexes, except those which start with the _id index,
- // must contain the proposed key as a prefix (uniqueness of the _id index is
- // ensured by the _id generation process or guaranteed by the user).
- //
- // 2. If the collection is not empty, there must exist at least one index that
- // is "useful" for the proposed key. A "useful" index is defined as follows
- // Useful Index:
- // i. contains proposedKey as a prefix
- // ii. is not a sparse index, partial index, or index with a non-simple collation
- // iii. contains no null values
- // iv. is not multikey (maybe lift this restriction later)
- // v. if a hashed index, has default seed (lift this restriction later)
- //
- // 3. If the proposed shard key is specified as unique, there must exist a useful,
- // unique index exactly equal to the proposedKey (not just a prefix).
- //
- // After validating these constraint:
- //
- // 4. If there is no useful index, and the collection is non-empty, we
- // must fail.
- //
- // 5. If the collection is empty, and it's still possible to create an index
- // on the proposed key, we go ahead and do so.
- std::list<BSONObj> indexes = conn->getIndexSpecs(nss.ns());
-
- // 1. Verify consistency with existing unique indexes
-
- for (const auto& idx : indexes) {
- BSONObj currentKey = idx["key"].embeddedObject();
- bool isUnique = idx["unique"].trueValue();
- if (isUnique && !shardKeyPattern.isUniqueIndexCompatible(currentKey)) {
- return appendCommandStatus(
- result,
- {ErrorCodes::InvalidOptions,
- str::stream()
- << "can't shard collection '"
- << nss.ns()
- << "' with unique index on "
- << currentKey
- << " and proposed shard key "
- << proposedKey
- << ". Uniqueness can't be maintained unless shard key is a prefix"});
- }
- }
-
- // 2. Check for a useful index
- bool hasUsefulIndexForKey = false;
- for (const auto& idx : indexes) {
- BSONObj currentKey = idx["key"].embeddedObject();
- // Check 2.i. and 2.ii.
- if (!idx["sparse"].trueValue() && idx["filter"].eoo() && idx["collation"].eoo() &&
- proposedKey.isPrefixOf(currentKey, SimpleBSONElementComparator::kInstance)) {
- // We can't currently use hashed indexes with a non-default hash seed
- // Check v.
- // Note that this means that, for sharding, we only support one hashed index
- // per field per collection.
- if (isHashedShardKey && !idx["seed"].eoo() &&
- idx["seed"].numberInt() != BSONElementHasher::DEFAULT_HASH_SEED) {
- return appendCommandStatus(
- result,
- {ErrorCodes::InvalidOptions,
- str::stream() << "can't shard collection " << nss.ns()
- << " with hashed shard key "
- << proposedKey
- << " because the hashed index uses a non-default seed of "
- << idx["seed"].numberInt()});
- }
- hasUsefulIndexForKey = true;
- }
- }
-
- // 3. If proposed key is required to be unique, additionally check for exact match.
-
- if (hasUsefulIndexForKey && careAboutUnique) {
- BSONObj eqQuery = BSON("ns" << nss.ns() << "key" << proposedKey);
- BSONObj eqQueryResult;
-
- for (const auto& idx : indexes) {
- if (SimpleBSONObjComparator::kInstance.evaluate(idx["key"].embeddedObject() ==
- proposedKey)) {
- eqQueryResult = idx;
- break;
- }
- }
-
- if (eqQueryResult.isEmpty()) {
- // If no exact match, index not useful, but still possible to create one later
- hasUsefulIndexForKey = false;
- } else {
- bool isExplicitlyUnique = eqQueryResult["unique"].trueValue();
- BSONObj currKey = eqQueryResult["key"].embeddedObject();
- bool isCurrentID = str::equals(currKey.firstElementFieldName(), "_id");
-
- if (!isExplicitlyUnique && !isCurrentID) {
- return appendCommandStatus(
- result,
- {ErrorCodes::InvalidOptions,
- str::stream()
- << "can't shard collection "
- << nss.ns()
- << ", "
- << proposedKey
- << " index not unique, and unique index explicitly specified"});
- }
- }
- }
-
- if (hasUsefulIndexForKey) {
- // Check 2.iii and 2.iv. Make sure no null entries in the sharding index
- // and that there is a useful, non-multikey index available
- BSONObjBuilder checkShardingIndexCmd;
- checkShardingIndexCmd.append("checkShardingIndex", nss.ns());
- checkShardingIndexCmd.append("keyPattern", proposedKey);
-
- if (!conn.get()->runCommand("admin", checkShardingIndexCmd.obj(), res)) {
- return appendCommandStatus(result,
- {ErrorCodes::OperationFailed, res["errmsg"].str()});
- }
- } else if (conn->count(nss.ns()) != 0) {
- // 4. if no useful index, and collection is non-empty, fail
- return appendCommandStatus(
- result,
- {ErrorCodes::InvalidOptions,
- "Please create an index that starts with the proposed shard key before "
- "sharding the collection"});
- } else {
- // 5. If no useful index exists, and collection empty, create one on proposedKey.
- // Only need to call ensureIndex on primary shard, since indexes get copied to
- // receiving shard whenever a migrate occurs.
- // If the collection has a default collation, explicitly send the simple
- // collation as part of the createIndex request.
- BSONObj collation =
- !defaultCollation.isEmpty() ? CollationSpec::kSimpleSpec : BSONObj();
- auto createIndexesCmd =
- makeCreateIndexesCmd(nss, proposedKey, collation, careAboutUnique);
-
- const auto swResponse = primaryShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- nss.db().toString(),
- createIndexesCmd,
- Shard::RetryPolicy::kNotIdempotent);
- auto createIndexesStatus = swResponse.getStatus();
- if (createIndexesStatus.isOK()) {
- const auto response = swResponse.getValue();
- createIndexesStatus = (!response.commandStatus.isOK())
- ? response.commandStatus
- : response.writeConcernStatus;
- }
- uassertStatusOK(createIndexesStatus);
- }
+ // Step 3.
+ validateShardKeyAgainstExistingIndexes(
+ opCtx, nss, proposedKey, shardKeyPattern, primaryShard, conn, request);
+ // isEmpty is used by multiple steps below.
bool isEmpty = (conn->count(nss.ns()) == 0);
- // Pre-splitting:
- // For new collections which use hashed shard keys, we can can pre-split the
- // range of possible hashes into a large number of chunks, and distribute them
- // evenly at creation time. Until we design a better initialization scheme, the
- // safest way to pre-split is to
- // 1. make one big chunk for each shard
- // 2. move them one at a time
- // 3. split the big chunks to achieve the desired total number of initial chunks
-
+ // Step 4.
std::vector<BSONObj> initSplits; // there will be at most numShards-1 of these
std::vector<BSONObj> allSplits; // all of the initial desired split points
-
- if (shardCollRequest.getInitialSplitPoints()) {
- initSplits = std::move(*shardCollRequest.getInitialSplitPoints());
- } else if (isHashedShardKey && isEmpty) {
- // If initial split points are not specified, only pre-split when using a hashed shard
- // key and the collection is empty
-
- if (numChunks <= 0) {
- // default number of initial chunks
- numChunks = 2 * numShards;
- }
-
- // hashes are signed, 64-bit ints. So we divide the range (-MIN long, +MAX long)
- // into intervals of size (2^64/numChunks) and create split points at the
- // boundaries. The logic below ensures that initial chunks are all
- // symmetric around 0.
- long long intervalSize = (std::numeric_limits<long long>::max() / numChunks) * 2;
- long long current = 0;
-
- if (numChunks % 2 == 0) {
- allSplits.push_back(BSON(proposedKey.firstElementFieldName() << current));
- current += intervalSize;
- } else {
- current += intervalSize / 2;
- }
-
- for (int i = 0; i < (numChunks - 1) / 2; i++) {
- allSplits.push_back(BSON(proposedKey.firstElementFieldName() << current));
- allSplits.push_back(BSON(proposedKey.firstElementFieldName() << -current));
- current += intervalSize;
- }
-
- sort(allSplits.begin(),
- allSplits.end(),
- SimpleBSONObjComparator::kInstance.makeLessThan());
-
- // 1. the initial splits define the "big chunks" that we will subdivide later
- int lastIndex = -1;
- for (int i = 1; i < numShards; i++) {
- if (lastIndex < (i * numChunks) / numShards - 1) {
- lastIndex = (i * numChunks) / numShards - 1;
- initSplits.push_back(allSplits[lastIndex]);
- }
- }
- } else if (numChunks > 0) {
- return appendCommandStatus(
- result,
- {ErrorCodes::InvalidOptions,
- str::stream() << (!isHashedShardKey ? "numInitialChunks is not supported "
- "when the shard key is not hashed."
- : "numInitialChunks is not supported "
- "when the collection is not empty.")});
- }
+ determinePresplittingPoints(opCtx,
+ numShards,
+ isEmpty,
+ proposedKey,
+ shardKeyPattern,
+ request,
+ &initSplits,
+ &allSplits);
LOG(0) << "CMD: shardcollection: " << cmdObj;
- audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, careAboutUnique);
+ audit::logShardCollection(Client::getCurrent(), nss.ns(), proposedKey, request.getUnique());
// The initial chunks are distributed evenly across shards only if the initial split points
// were specified in the request, i.e., by mapReduce. Otherwise, all the initial chunks are
// placed on the primary shard, and may be distributed across shards through migrations
// (below) if using a hashed shard key.
- const bool distributeInitialChunks =
- shardCollRequest.getInitialSplitPoints().is_initialized();
+ const bool distributeInitialChunks = request.getInitialSplitPoints().is_initialized();
+ // Step 5. Actually shard the collection.
catalogManager->shardCollection(opCtx,
nss.ns(),
shardKeyPattern,
- defaultCollation,
- careAboutUnique,
+ *request.getCollation(),
+ request.getUnique(),
initSplits,
distributeInitialChunks);
-
- // Free the collection dist lock in order to allow the initial splits and moves below to
- // proceed
- scopedDistLock.reset();
-
result << "collectionsharded" << nss.ns();
// Make sure the cached metadata for the collection knows that we are now sharded
catalogCache->invalidateShardedCollection(nss);
- // Only initially move chunks when using a hashed shard key
- if (isHashedShardKey && isEmpty) {
- routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "Collection was successfully written as sharded but got dropped before it "
- "could be evenly distributed",
- routingInfo.cm());
-
- auto chunkManager = routingInfo.cm();
-
- // 2. Move and commit each "big chunk" to a different shard.
- int i = 0;
- for (auto chunk : chunkManager->chunks()) {
- const ShardId& shardId = shardIds[i++ % numShards];
- const auto toStatus = shardRegistry->getShard(opCtx, shardId);
- if (!toStatus.isOK()) {
- continue;
- }
- const auto to = toStatus.getValue();
-
- // Can't move chunk to shard it's already on
- if (to->getId() == chunk->getShardId()) {
- continue;
- }
-
- ChunkType chunkType;
- chunkType.setNS(nss.ns());
- chunkType.setMin(chunk->getMin());
- chunkType.setMax(chunk->getMax());
- chunkType.setShard(chunk->getShardId());
- chunkType.setVersion(chunkManager->getVersion());
-
- Status moveStatus = configsvr_client::moveChunk(
- opCtx,
- chunkType,
- to->getId(),
- Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(),
- MigrationSecondaryThrottleOptions::create(
- MigrationSecondaryThrottleOptions::kOff),
- true);
- if (!moveStatus.isOK()) {
- warning() << "couldn't move chunk " << redact(chunk->toString()) << " to shard "
- << *to << " while sharding collection " << nss.ns()
- << causedBy(redact(moveStatus));
- }
- }
-
- if (allSplits.empty()) {
- return true;
- }
+ // Free the collection dist lock in order to allow the initial splits and moves below to
+ // proceed.
+ scopedDistLock.reset();
- // Reload the config info, after all the migrations
- catalogCache->invalidateShardedCollection(nss);
- routingInfo = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "Collection was successfully written as sharded but got dropped before it "
- "could be evenly distributed",
- routingInfo.cm());
- chunkManager = routingInfo.cm();
-
- // 3. Subdivide the big chunks by splitting at each of the points in "allSplits"
- // that we haven't already split by.
- auto currentChunk =
- chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[0]);
-
- std::vector<BSONObj> subSplits;
- for (unsigned i = 0; i <= allSplits.size(); i++) {
- if (i == allSplits.size() || !currentChunk->containsKey(allSplits[i])) {
- if (!subSplits.empty()) {
- auto splitStatus = shardutil::splitChunkAtMultiplePoints(
- opCtx,
- currentChunk->getShardId(),
- nss,
- chunkManager->getShardKeyPattern(),
- chunkManager->getVersion(),
- ChunkRange(currentChunk->getMin(), currentChunk->getMax()),
- subSplits);
- if (!splitStatus.isOK()) {
- warning() << "couldn't split chunk " << redact(currentChunk->toString())
- << " while sharding collection " << nss.ns()
- << causedBy(redact(splitStatus.getStatus()));
- }
-
- subSplits.clear();
- }
-
- if (i < allSplits.size()) {
- currentChunk =
- chunkManager->findIntersectingChunkWithSimpleCollation(allSplits[i]);
- }
- } else {
- BSONObj splitPoint(allSplits[i]);
-
- // Do not split on the boundaries
- if (currentChunk->getMin().woCompare(splitPoint) == 0) {
- continue;
- }
-
- subSplits.push_back(splitPoint);
- }
- }
- }
+ // Step 6. Migrate initial chunks to distribute them across shards.
+ migrateAndFurtherSplitInitialChunks(
+ opCtx, nss, numShards, shardIds, isEmpty, shardKeyPattern, allSplits);
return true;
}