/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include #include #include "mongo/base/status_with.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connection_string.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/catalog/collection_options.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/set_shard_version_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/shard_util.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/scopeguard.h" namespace mongo { using CollectionUUID = UUID; using std::string; using std::vector; using std::set; namespace { const Seconds kDefaultFindHostMaxWaitTime(20); const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); const char kWriteConcernField[] = "writeConcern"; void checkForExistingChunks(OperationContext* opCtx, const NamespaceString& nss) { BSONObjBuilder countBuilder; countBuilder.append("count", ChunkType::ConfigNS.coll()); countBuilder.append("query", BSON(ChunkType::ns(nss.ns()))); // OK to use limit=1, since if any chunks exist, we will fail. countBuilder.append("limit", 1); // Use readConcern local to guarantee we see any chunks that have been written and may // become committed; readConcern majority will not see the chunks if they have not made it // to the majority snapshot. repl::ReadConcernArgs readConcern(repl::ReadConcernLevel::kLocalReadConcern); readConcern.appendInfo(&countBuilder); auto cmdResponse = uassertStatusOK( Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( opCtx, kConfigReadSelector, ChunkType::ConfigNS.db().toString(), countBuilder.done(), Shard::kDefaultConfigCommandTimeout, Shard::RetryPolicy::kIdempotent)); uassertStatusOK(cmdResponse.commandStatus); long long numChunks; uassertStatusOK(bsonExtractIntegerField(cmdResponse.response, "n", &numChunks)); uassert(ErrorCodes::ManualInterventionRequired, str::stream() << "A previous attempt to shard collection " << nss.ns() << " failed after writing some initial chunks to config.chunks. Please " "manually delete the partially written chunks for collection " << nss.ns() << " from config.chunks", numChunks == 0); } boost::optional checkCollectionOptions(OperationContext* opCtx, Shard* shard, const NamespaceString& ns, const CollectionOptions options) { BSONObjBuilder listCollCmd; listCollCmd.append("listCollections", 1); listCollCmd.append("filter", BSON("name" << ns.coll())); auto response = uassertStatusOK( shard->runCommandWithFixedRetryAttempts(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, ns.db().toString(), listCollCmd.obj(), Shard::RetryPolicy::kIdempotent)); auto cursorObj = response.response["cursor"].Obj(); auto collections = cursorObj["firstBatch"].Obj(); BSONObjIterator collIter(collections); uassert(ErrorCodes::NamespaceNotFound, str::stream() << "cannot find ns: " << ns.ns(), collIter.more()); auto collectionDetails = collIter.next(); CollectionOptions actualOptions; uassertStatusOK(actualOptions.parse(collectionDetails["options"].Obj())); // TODO: SERVER-33048 check idIndex field uassert(ErrorCodes::NamespaceExists, str::stream() << "ns: " << ns.ns() << " already exists with different options: " << actualOptions.toBSON(), options.matchesStorageOptions( actualOptions, CollatorFactoryInterface::get(opCtx->getServiceContext()))); if (actualOptions.isView()) { // Views don't have UUID. return boost::none; } auto collectionInfo = collectionDetails["info"].Obj(); return uassertStatusOK(UUID::parse(collectionInfo["uuid"])); } } // namespace /** * Creates and writes to the config server the first chunks for a newly sharded collection. Returns * the version generated for the collection. */ ChunkVersion ShardingCatalogManager::_createFirstChunks(OperationContext* opCtx, const NamespaceString& nss, const ShardKeyPattern& shardKeyPattern, const ShardId& primaryShardId, const std::vector& initPoints, const bool distributeInitialChunks) { const KeyPattern keyPattern = shardKeyPattern.getKeyPattern(); vector splitPoints; vector shardIds; std::string primaryShardName = primaryShardId.toString(); auto drainingCount = uassertStatusOK(_runCountCommandOnConfig( opCtx, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name() << primaryShardName << ShardType::draining(true)))); const bool primaryDraining = (drainingCount > 0); auto getPrimaryOrFirstNonDrainingShard = [&opCtx, primaryShardId, primaryDraining]() { if (primaryDraining) { vector allShardIds; Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&allShardIds); auto dbShardId = allShardIds[0]; if (allShardIds[0] == primaryShardId && allShardIds.size() > 1) { dbShardId = allShardIds[1]; } return dbShardId; } else { return primaryShardId; } }; if (initPoints.empty()) { // If no split points were specified use the shard's data distribution to determine them auto primaryShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, nss.db().toString(), BSON("count" << nss.coll()), Shard::RetryPolicy::kIdempotent)); long long numObjects = 0; uassertStatusOK(result.commandStatus); uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects)); // Refresh the balancer settings to ensure the chunk size setting, which is sent as part of // the splitVector command and affects the number of chunks returned, has been loaded. uassertStatusOK(Grid::get(opCtx)->getBalancerConfiguration()->refreshAndCheck(opCtx)); if (numObjects > 0) { splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( opCtx, primaryShardId, nss, shardKeyPattern, ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), 0)); } // If docs already exist for the collection, must use primary shard, // otherwise defer to passed-in distribution option. if (numObjects == 0 && distributeInitialChunks) { Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); if (primaryDraining && shardIds.size() > 1) { shardIds.erase(std::remove(shardIds.begin(), shardIds.end(), primaryShardId), shardIds.end()); } } else { shardIds.push_back(getPrimaryOrFirstNonDrainingShard()); } } else { // Make sure points are unique and ordered auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); for (const auto& initPoint : initPoints) { orderedPts.insert(initPoint); } for (const auto& initPoint : orderedPts) { splitPoints.push_back(initPoint); } if (distributeInitialChunks) { Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); if (primaryDraining) { shardIds.erase(std::remove(shardIds.begin(), shardIds.end(), primaryShardId), shardIds.end()); } } else { shardIds.push_back(getPrimaryOrFirstNonDrainingShard()); } } // This is the first chunk; start the versioning from scratch const OID epoch = OID::gen(); ChunkVersion version(1, 0, epoch); log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << nss << " using new epoch " << version.epoch(); const auto validAfter = LogicalClock::get(opCtx)->getClusterTime().asTimestamp(); for (unsigned i = 0; i <= splitPoints.size(); i++) { const BSONObj min = (i == 0) ? keyPattern.globalMin() : splitPoints[i - 1]; const BSONObj max = (i < splitPoints.size()) ? splitPoints[i] : keyPattern.globalMax(); // The correct version must be returned as part of this call so only increment for versions, // which get written if (i > 0) { version.incMinor(); } ChunkType chunk; chunk.setNS(nss); chunk.setMin(min); chunk.setMax(max); chunk.setShard(shardIds[i % shardIds.size()]); chunk.setVersion(version); if (serverGlobalParams.featureCompatibility.getVersion() >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { std::vector initialHistory; initialHistory.emplace_back(ChunkHistory(validAfter, shardIds[i % shardIds.size()])); chunk.setHistory(std::move(initialHistory)); } uassertStatusOK(Grid::get(opCtx)->catalogClient()->insertConfigDocument( opCtx, ChunkType::ConfigNS, chunk.toConfigBSON(), ShardingCatalogClient::kMajorityWriteConcern)); } return version; } Status ShardingCatalogManager::dropCollection(OperationContext* opCtx, const NamespaceString& nss) { const auto catalogClient = Grid::get(opCtx)->catalogClient(); catalogClient ->logChange(opCtx, "dropCollection.start", nss.ns(), BSONObj(), ShardingCatalogClientImpl::kMajorityWriteConcern) .ignore(); auto shardsStatus = catalogClient->getAllShards(opCtx, repl::ReadConcernLevel::kLocalReadConcern); if (!shardsStatus.isOK()) { return shardsStatus.getStatus(); } vector allShards = std::move(shardsStatus.getValue().value); LOG(1) << "dropCollection " << nss.ns() << " started"; const auto dropCommandBSON = [opCtx, &nss] { BSONObjBuilder builder; builder.append("drop", nss.coll()); if (!opCtx->getWriteConcern().usedDefault) { builder.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); } return builder.obj(); }(); std::map errors; auto* const shardRegistry = Grid::get(opCtx)->shardRegistry(); for (const auto& shardEntry : allShards) { auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); if (!swShard.isOK()) { return swShard.getStatus(); } const auto& shard = swShard.getValue(); auto swDropResult = shard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, nss.db().toString(), dropCommandBSON, Shard::RetryPolicy::kIdempotent); if (!swDropResult.isOK()) { return swDropResult.getStatus().withContext( str::stream() << "Error dropping collection on shard " << shardEntry.getName()); } auto& dropResult = swDropResult.getValue(); auto dropStatus = std::move(dropResult.commandStatus); auto wcStatus = std::move(dropResult.writeConcernStatus); if (!dropStatus.isOK() || !wcStatus.isOK()) { if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) { // Generally getting NamespaceNotFound is okay to ignore as it simply means that // the collection has already been dropped or doesn't exist on this shard. // If, however, we get NamespaceNotFound but also have a write concern error then we // can't confirm whether the fact that the namespace doesn't exist is actually // committed. Thus we must still fail on NamespaceNotFound if there is also a write // concern error. This can happen if we call drop, it succeeds but with a write // concern error, then we retry the drop. continue; } errors.emplace(shardEntry.getHost(), std::move(dropResult.response)); } } if (!errors.empty()) { StringBuilder sb; sb << "Dropping collection failed on the following hosts: "; for (auto it = errors.cbegin(); it != errors.cend(); ++it) { if (it != errors.cbegin()) { sb << ", "; } sb << it->first << ": " << it->second; } return {ErrorCodes::OperationFailed, sb.str()}; } LOG(1) << "dropCollection " << nss.ns() << " shard data deleted"; // Remove chunk data Status result = catalogClient->removeConfigDocuments(opCtx, ChunkType::ConfigNS, BSON(ChunkType::ns(nss.ns())), ShardingCatalogClient::kMajorityWriteConcern); if (!result.isOK()) { return result; } LOG(1) << "dropCollection " << nss.ns() << " chunk data deleted"; // Mark the collection as dropped CollectionType coll; coll.setNs(nss); coll.setDropped(true); coll.setEpoch(ChunkVersion::DROPPED().epoch()); coll.setUpdatedAt(Grid::get(opCtx)->getNetwork()->now()); const bool upsert = false; result = ShardingCatalogClientImpl::updateShardingCatalogEntryForCollection( opCtx, nss, coll, upsert); if (!result.isOK()) { return result; } LOG(1) << "dropCollection " << nss.ns() << " collection marked as dropped"; for (const auto& shardEntry : allShards) { auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); if (!swShard.isOK()) { return swShard.getStatus(); } const auto& shard = swShard.getValue(); SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( shardRegistry->getConfigServerConnectionString(), shardEntry.getName(), fassert(28781, ConnectionString::parse(shardEntry.getHost())), nss, ChunkVersion::DROPPED(), true); auto ssvResult = shard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", ssv.toBSON(), Shard::RetryPolicy::kIdempotent); if (!ssvResult.isOK()) { return ssvResult.getStatus(); } auto ssvStatus = std::move(ssvResult.getValue().commandStatus); if (!ssvStatus.isOK()) { return ssvStatus; } auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("unsetSharding" << 1), Shard::RetryPolicy::kIdempotent); if (!unsetShardingStatus.isOK()) { return unsetShardingStatus.getStatus(); } auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus); if (!unsetShardingResult.isOK()) { return unsetShardingResult; } } LOG(1) << "dropCollection " << nss.ns() << " completed"; catalogClient ->logChange(opCtx, "dropCollection", nss.ns(), BSONObj(), ShardingCatalogClientImpl::kMajorityWriteConcern) .ignore(); return Status::OK(); } void ShardingCatalogManager::shardCollection(OperationContext* opCtx, const NamespaceString& nss, const boost::optional uuid, const ShardKeyPattern& fieldsAndOrder, const BSONObj& defaultCollation, bool unique, const vector& initPoints, const bool distributeInitialChunks, const ShardId& dbPrimaryShardId) { const auto catalogClient = Grid::get(opCtx)->catalogClient(); const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); // Fail if there are partially written chunks from a previous failed shardCollection. checkForExistingChunks(opCtx, nss); // Record start in changelog { BSONObjBuilder collectionDetail; collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); collectionDetail.append("collection", nss.ns()); if (uuid) { uuid->appendToBuilder(&collectionDetail, "uuid"); } collectionDetail.append("primary", primaryShard->toString()); collectionDetail.append("numChunks", static_cast(initPoints.size() + 1)); catalogClient ->logChange(opCtx, "shardCollection.start", nss.ns(), collectionDetail.obj(), ShardingCatalogClient::kMajorityWriteConcern) .transitional_ignore(); } // const NamespaceString nss(ns); // Construct the collection default collator. std::unique_ptr defaultCollator; if (!defaultCollation.isEmpty()) { defaultCollator = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(defaultCollation)); } const auto& collVersion = _createFirstChunks( opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, distributeInitialChunks); { CollectionType coll; coll.setNs(nss); if (uuid) { coll.setUUID(*uuid); } coll.setEpoch(collVersion.epoch()); // TODO(schwerin): The following isn't really a date, but is stored as one in-memory and in // config.collections, as a historical oddity. coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(collVersion.toLong())); coll.setKeyPattern(fieldsAndOrder.toBSON()); coll.setDefaultCollation(defaultCollator ? defaultCollator->getSpec().toBSON() : BSONObj()); coll.setUnique(unique); uassertStatusOK(ShardingCatalogClientImpl::updateShardingCatalogEntryForCollection( opCtx, nss, coll, true /*upsert*/)); } auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, dbPrimaryShardId)); invariant(!shard->isConfig()); // Tell the primary mongod to refresh its data // TODO: Think the real fix here is for mongos to just // assume that all collections are sharded, when we get there SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( shardRegistry->getConfigServerConnectionString(), dbPrimaryShardId, primaryShard->getConnString(), nss, collVersion, true); auto ssvResponse = shard->runCommandWithFixedRetryAttempts(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", ssv.toBSON(), Shard::RetryPolicy::kIdempotent); auto status = ssvResponse.isOK() ? std::move(ssvResponse.getValue().commandStatus) : std::move(ssvResponse.getStatus()); if (!status.isOK()) { warning() << "could not update initial version of " << nss.ns() << " on shard primary " << dbPrimaryShardId << causedBy(redact(status)); } catalogClient ->logChange(opCtx, "shardCollection.end", nss.ns(), BSON("version" << collVersion.toString()), ShardingCatalogClient::kMajorityWriteConcern) .transitional_ignore(); } void ShardingCatalogManager::generateUUIDsForExistingShardedCollections(OperationContext* opCtx) { // Retrieve all collections in config.collections that do not have a UUID. Some collections // may already have a UUID if an earlier upgrade attempt failed after making some progress. auto shardedColls = uassertStatusOK( Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, CollectionType::ConfigNS, BSON(CollectionType::uuid.name() << BSON("$exists" << false) << "dropped" << false), BSONObj(), // sort boost::none // limit )) .docs; if (shardedColls.empty()) { LOG(0) << "all sharded collections already have UUIDs"; // We did a local read of the collections collection above and found that all sharded // collections already have UUIDs. However, the data may not be majority committed (a // previous setFCV attempt may have failed with a write concern error). Since the current // Client doesn't know the opTime of the last write to the collections collection, make it // wait for the last opTime in the system when we wait for writeConcern. repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); return; } // Generate and persist a new UUID for each collection that did not have a UUID. LOG(0) << "generating UUIDs for " << shardedColls.size() << " sharded collections that do not yet have a UUID"; for (auto& coll : shardedColls) { auto collType = uassertStatusOK(CollectionType::fromBSON(coll)); invariant(!collType.getUUID()); auto uuid = CollectionUUID::gen(); collType.setUUID(uuid); uassertStatusOK(ShardingCatalogClientImpl::updateShardingCatalogEntryForCollection( opCtx, collType.getNs(), collType, false /* upsert */)); LOG(2) << "updated entry in config.collections for sharded collection " << collType.getNs() << " with generated UUID " << uuid; } } void ShardingCatalogManager::createCollection(OperationContext* opCtx, const NamespaceString& ns, const CollectionOptions& collOptions) { const auto catalogClient = Grid::get(opCtx)->catalogClient(); auto shardRegistry = Grid::get(opCtx)->shardRegistry(); auto dbEntry = uassertStatusOK(catalogClient->getDatabase( opCtx, ns.db().toString(), repl::ReadConcernLevel::kLocalReadConcern)) .value; const auto& primaryShardId = dbEntry.getPrimary(); auto primaryShard = uassertStatusOK(shardRegistry->getShard(opCtx, primaryShardId)); BSONObjBuilder createCmdBuilder; createCmdBuilder.append("create", ns.coll()); collOptions.appendBSON(&createCmdBuilder); createCmdBuilder.append(kWriteConcernField, opCtx->getWriteConcern().toBSON()); auto swResponse = primaryShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, ns.db().toString(), createCmdBuilder.obj(), Shard::RetryPolicy::kIdempotent); auto createStatus = Shard::CommandResponse::getEffectiveStatus(swResponse); if (!createStatus.isOK() && createStatus != ErrorCodes::NamespaceExists) { uassertStatusOK(createStatus); } checkCollectionOptions(opCtx, primaryShard.get(), ns, collOptions); // TODO: SERVER-33094 use UUID returned to write config.collections entries. // Make sure to advance the opTime if writes didn't occur during the execution of this // command. This is to ensure that this request will wait for the opTime that at least // reflects the current state (that this command observed) while waiting for replication // to satisfy the write concern. repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); } } // namespace mongo