/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/catalog/sharding_catalog_client_impl.h" #include #include #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/audit.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/s/balancer_configuration.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/type_changelog.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_config_version.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/set_shard_version_request.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/shard_util.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" namespace mongo { MONGO_FP_DECLARE(failApplyChunkOps); MONGO_FP_DECLARE(setDropCollDistLockWait); using repl::OpTime; using std::set; using std::shared_ptr; using std::string; using std::unique_ptr; using std::vector; using str::stream; namespace { const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred, TagSet{}); const int kMaxReadRetry = 3; const int kMaxWriteRetry = 3; const std::string kActionLogCollectionName("actionlog"); const int kActionLogCollectionSizeMB = 2 * 1024 * 1024; const std::string kChangeLogCollectionName("changelog"); const int kChangeLogCollectionSizeMB = 10 * 1024 * 1024; const NamespaceString kSettingsNamespace("config", "settings"); void toBatchError(const Status& status, BatchedCommandResponse* response) { response->clear(); response->setErrCode(status.code()); response->setErrMessage(status.reason()); response->setOk(false); } /** * Creates and writes to the config server the first chunks for a newly sharded collection. Returns * the version generated for the collection. */ StatusWith createFirstChunks(OperationContext* opCtx, const NamespaceString& nss, const ShardKeyPattern& shardKeyPattern, const ShardId& primaryShardId, const std::vector& initPoints, const std::set& initShardIds) { const KeyPattern keyPattern = shardKeyPattern.getKeyPattern(); vector splitPoints; vector shardIds; if (initPoints.empty()) { // If no split points were specified use the shard's data distribution to determine them auto primaryShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, primaryShardId)); auto result = uassertStatusOK(primaryShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryPreferred}, nss.db().toString(), BSON("count" << nss.coll()), Shard::RetryPolicy::kIdempotent)); long long numObjects = 0; uassertStatusOK(result.commandStatus); uassertStatusOK(bsonExtractIntegerField(result.response, "n", &numObjects)); if (numObjects > 0) { splitPoints = uassertStatusOK(shardutil::selectChunkSplitPoints( opCtx, primaryShardId, nss, shardKeyPattern, ChunkRange(keyPattern.globalMin(), keyPattern.globalMax()), Grid::get(opCtx)->getBalancerConfiguration()->getMaxChunkSizeBytes(), 0)); } // Since docs already exist for the collection, must use primary shard shardIds.push_back(primaryShardId); } else { // Make sure points are unique and ordered auto orderedPts = SimpleBSONObjComparator::kInstance.makeBSONObjSet(); for (const auto& initPoint : initPoints) { orderedPts.insert(initPoint); } for (const auto& initPoint : orderedPts) { splitPoints.push_back(initPoint); } if (initShardIds.empty()) { // If not specified, only use the primary shard (note that it's not safe for mongos to // put initial chunks on other shards without the primary mongod knowing) shardIds.push_back(primaryShardId); } else { std::copy(initShardIds.begin(), initShardIds.end(), std::back_inserter(shardIds)); } } // This is the first chunk; start the versioning from scratch const OID epoch = OID::gen(); ChunkVersion version(1, 0, epoch); log() << "going to create " << splitPoints.size() + 1 << " chunk(s) for: " << nss << " using new epoch " << version.epoch(); for (unsigned i = 0; i <= splitPoints.size(); i++) { const BSONObj min = (i == 0) ? keyPattern.globalMin() : splitPoints[i - 1]; const BSONObj max = (i < splitPoints.size()) ? splitPoints[i] : keyPattern.globalMax(); // The correct version must be returned as part of this call so only increment for versions, // which get written if (i > 0) { version.incMinor(); } ChunkType chunk; chunk.setNS(nss.ns()); chunk.setMin(min); chunk.setMax(max); chunk.setShard(shardIds[i % shardIds.size()]); chunk.setVersion(version); Status status = Grid::get(opCtx)->catalogClient(opCtx)->insertConfigDocument( opCtx, ChunkType::ConfigNS, chunk.toConfigBSON(), ShardingCatalogClient::kMajorityWriteConcern); if (!status.isOK()) { return {status.code(), str::stream() << "Creating first chunks failed due to " << redact(status.reason())}; } } return version; } } // namespace ShardingCatalogClientImpl::ShardingCatalogClientImpl( std::unique_ptr distLockManager) : _distLockManager(std::move(distLockManager)) {} ShardingCatalogClientImpl::~ShardingCatalogClientImpl() = default; Status ShardingCatalogClientImpl::startup() { stdx::lock_guard lk(_mutex); if (_started) { return Status::OK(); } _started = true; _distLockManager->startUp(); return Status::OK(); } void ShardingCatalogClientImpl::shutDown(OperationContext* opCtx) { LOG(1) << "ShardingCatalogClientImpl::shutDown() called."; { stdx::lock_guard lk(_mutex); _inShutdown = true; } invariant(_distLockManager); _distLockManager->shutDown(opCtx); } Status ShardingCatalogClientImpl::_updateCollection(OperationContext* opCtx, const std::string& collNs, const CollectionType& coll, bool upsert) { fassert(28634, coll.validate()); auto status = updateConfigDocument(opCtx, CollectionType::ConfigNS, BSON(CollectionType::fullNs(collNs)), coll.toBSON(), upsert, ShardingCatalogClient::kMajorityWriteConcern); if (!status.isOK()) { return {status.getStatus().code(), str::stream() << "Collection metadata write failed due to " << status.getStatus().reason()}; } return Status::OK(); } Status ShardingCatalogClientImpl::updateDatabase(OperationContext* opCtx, const std::string& dbName, const DatabaseType& db) { fassert(28616, db.validate()); auto status = updateConfigDocument(opCtx, DatabaseType::ConfigNS, BSON(DatabaseType::name(dbName)), db.toBSON(), true, ShardingCatalogClient::kMajorityWriteConcern); if (!status.isOK()) { return {status.getStatus().code(), str::stream() << "Database metadata write failed due to " << status.getStatus().reason()}; } return Status::OK(); } Status ShardingCatalogClientImpl::createDatabase(OperationContext* opCtx, const std::string& dbName) { invariant(nsIsDbOnly(dbName)); // The admin and config databases should never be explicitly created. They "just exist", // i.e. getDatabase will always return an entry for them. invariant(dbName != "admin"); invariant(dbName != "config"); // Lock the database globally to prevent conflicts with simultaneous database creation. auto scopedDistLock = getDistLockManager()->lock( opCtx, dbName, "createDatabase", DistLockManager::kDefaultLockTimeout); if (!scopedDistLock.isOK()) { return scopedDistLock.getStatus(); } // check for case sensitivity violations Status status = _checkDbDoesNotExist(opCtx, dbName, nullptr); if (!status.isOK()) { return status; } // Database does not exist, pick a shard and create a new entry auto newShardIdStatus = _selectShardForNewDatabase(opCtx, Grid::get(opCtx)->shardRegistry()); if (!newShardIdStatus.isOK()) { return newShardIdStatus.getStatus(); } const ShardId& newShardId = newShardIdStatus.getValue(); log() << "Placing [" << dbName << "] on: " << newShardId; DatabaseType db; db.setName(dbName); db.setPrimary(newShardId); db.setSharded(false); status = insertConfigDocument( opCtx, DatabaseType::ConfigNS, db.toBSON(), ShardingCatalogClient::kMajorityWriteConcern); if (status.code() == ErrorCodes::DuplicateKey) { return Status(ErrorCodes::NamespaceExists, "database " + dbName + " already exists"); } return status; } Status ShardingCatalogClientImpl::logAction(OperationContext* opCtx, const std::string& what, const std::string& ns, const BSONObj& detail) { if (_actionLogCollectionCreated.load() == 0) { Status result = _createCappedConfigCollection(opCtx, kActionLogCollectionName, kActionLogCollectionSizeMB, ShardingCatalogClient::kMajorityWriteConcern); if (result.isOK()) { _actionLogCollectionCreated.store(1); } else { log() << "couldn't create config.actionlog collection:" << causedBy(result); return result; } } return _log(opCtx, kActionLogCollectionName, what, ns, detail, ShardingCatalogClient::kMajorityWriteConcern); } Status ShardingCatalogClientImpl::logChange(OperationContext* opCtx, const std::string& what, const std::string& ns, const BSONObj& detail, const WriteConcernOptions& writeConcern) { invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || writeConcern.wMode == WriteConcernOptions::kMajority); if (_changeLogCollectionCreated.load() == 0) { Status result = _createCappedConfigCollection( opCtx, kChangeLogCollectionName, kChangeLogCollectionSizeMB, writeConcern); if (result.isOK()) { _changeLogCollectionCreated.store(1); } else { log() << "couldn't create config.changelog collection:" << causedBy(result); return result; } } return _log(opCtx, kChangeLogCollectionName, what, ns, detail, writeConcern); } // static StatusWith ShardingCatalogClientImpl::_selectShardForNewDatabase( OperationContext* opCtx, ShardRegistry* shardRegistry) { vector allShardIds; shardRegistry->getAllShardIds(&allShardIds); if (allShardIds.empty()) { shardRegistry->reload(opCtx); shardRegistry->getAllShardIds(&allShardIds); if (allShardIds.empty()) { return Status(ErrorCodes::ShardNotFound, "No shards found"); } } ShardId candidateShardId = allShardIds[0]; auto candidateSizeStatus = shardutil::retrieveTotalShardSize(opCtx, candidateShardId); if (!candidateSizeStatus.isOK()) { return candidateSizeStatus.getStatus(); } for (size_t i = 1; i < allShardIds.size(); i++) { const ShardId shardId = allShardIds[i]; const auto sizeStatus = shardutil::retrieveTotalShardSize(opCtx, shardId); if (!sizeStatus.isOK()) { return sizeStatus.getStatus(); } if (sizeStatus.getValue() < candidateSizeStatus.getValue()) { candidateSizeStatus = sizeStatus; candidateShardId = shardId; } } return candidateShardId; } Status ShardingCatalogClientImpl::enableSharding(OperationContext* opCtx, const std::string& dbName) { invariant(nsIsDbOnly(dbName)); if (dbName == NamespaceString::kConfigDb || dbName == NamespaceString::kAdminDb) { return { ErrorCodes::IllegalOperation, str::stream() << "Enabling sharding on system configuration databases is not allowed"}; } // Lock the database globally to prevent conflicts with simultaneous database // creation/modification. auto scopedDistLock = getDistLockManager()->lock( opCtx, dbName, "enableSharding", DistLockManager::kDefaultLockTimeout); if (!scopedDistLock.isOK()) { return scopedDistLock.getStatus(); } // Check for case sensitivity violations DatabaseType db; Status status = _checkDbDoesNotExist(opCtx, dbName, &db); if (status.isOK()) { // Database does not exist, create a new entry auto newShardIdStatus = _selectShardForNewDatabase(opCtx, Grid::get(opCtx)->shardRegistry()); if (!newShardIdStatus.isOK()) { return newShardIdStatus.getStatus(); } const ShardId& newShardId = newShardIdStatus.getValue(); log() << "Placing [" << dbName << "] on: " << newShardId; db.setName(dbName); db.setPrimary(newShardId); db.setSharded(true); } else if (status.code() == ErrorCodes::NamespaceExists) { if (db.getSharded()) { return Status(ErrorCodes::AlreadyInitialized, str::stream() << "sharding already enabled for database " << dbName); } // Database exists, so just update it db.setSharded(true); } else { return status; } log() << "Enabling sharding for database [" << dbName << "] in config db"; return updateDatabase(opCtx, dbName, db); } Status ShardingCatalogClientImpl::_log(OperationContext* opCtx, const StringData& logCollName, const std::string& what, const std::string& operationNS, const BSONObj& detail, const WriteConcernOptions& writeConcern) { Date_t now = Grid::get(opCtx)->getNetwork()->now(); const std::string hostName = Grid::get(opCtx)->getNetwork()->getHostName(); const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen(); ChangeLogType changeLog; changeLog.setChangeId(changeId); changeLog.setServer(hostName); changeLog.setClientAddr(opCtx->getClient()->clientAddress(true)); changeLog.setTime(now); changeLog.setNS(operationNS); changeLog.setWhat(what); changeLog.setDetails(detail); BSONObj changeLogBSON = changeLog.toBSON(); log() << "about to log metadata event into " << logCollName << ": " << redact(changeLogBSON); const NamespaceString nss("config", logCollName); Status result = insertConfigDocument(opCtx, nss.ns(), changeLogBSON, writeConcern); if (!result.isOK()) { warning() << "Error encountered while logging config change with ID [" << changeId << "] into collection " << logCollName << ": " << redact(result); } return result; } Status ShardingCatalogClientImpl::shardCollection(OperationContext* opCtx, const string& ns, const ShardKeyPattern& fieldsAndOrder, const BSONObj& defaultCollation, bool unique, const vector& initPoints, const set& initShardIds) { // Lock the collection globally so that no other mongos can try to shard or drop the collection // at the same time. auto scopedDistLock = getDistLockManager()->lock( opCtx, ns, "shardCollection", DistLockManager::kDefaultLockTimeout); if (!scopedDistLock.isOK()) { return scopedDistLock.getStatus(); } auto getDBStatus = getDatabase(opCtx, nsToDatabase(ns)); if (!getDBStatus.isOK()) { return getDBStatus.getStatus(); } auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); ShardId dbPrimaryShardId = getDBStatus.getValue().value.getPrimary(); const auto primaryShardStatus = shardRegistry->getShard(opCtx, dbPrimaryShardId); if (!primaryShardStatus.isOK()) { return primaryShardStatus.getStatus(); } { // This is an extra safety check that there aren't any partially written chunks from a // previous failed invocation of 'shardCollection' auto countStatus = _runCountCommandOnConfig( opCtx, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::ns(ns))); if (!countStatus.isOK()) { return countStatus.getStatus(); } if (countStatus.getValue() > 0) { return {ErrorCodes::AlreadyInitialized, str::stream() << "collection " << ns << " already sharded with " << countStatus.getValue() << " chunks."}; } } // Record start in changelog { BSONObjBuilder collectionDetail; collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); collectionDetail.append("collection", ns); collectionDetail.append("primary", primaryShardStatus.getValue()->toString()); { BSONArrayBuilder initialShards(collectionDetail.subarrayStart("initShards")); for (const ShardId& shardId : initShardIds) { initialShards.append(shardId.toString()); } } collectionDetail.append("numChunks", static_cast(initPoints.size() + 1)); logChange(opCtx, "shardCollection.start", ns, collectionDetail.obj(), ShardingCatalogClientImpl::kMajorityWriteConcern); } const NamespaceString nss(ns); // Construct the collection default collator. std::unique_ptr defaultCollator; if (!defaultCollation.isEmpty()) { auto statusWithCollator = CollatorFactoryInterface::get(opCtx->getServiceContext()) ->makeFromBSON(defaultCollation); if (!statusWithCollator.isOK()) { return statusWithCollator.getStatus(); } defaultCollator = std::move(statusWithCollator.getValue()); } auto createFirstChunksStatus = createFirstChunks(opCtx, nss, fieldsAndOrder, dbPrimaryShardId, initPoints, initShardIds); if (!createFirstChunksStatus.isOK()) { return createFirstChunksStatus.getStatus(); } const auto& collVersion = createFirstChunksStatus.getValue(); { CollectionType coll; coll.setNs(nss); coll.setEpoch(collVersion.epoch()); // TODO(schwerin): The following isn't really a date, but is stored as one in-memory and in // config.collections, as a historical oddity. coll.setUpdatedAt(Date_t::fromMillisSinceEpoch(collVersion.toLong())); coll.setKeyPattern(fieldsAndOrder.toBSON()); coll.setDefaultCollation(defaultCollator ? defaultCollator->getSpec().toBSON() : BSONObj()); coll.setUnique(unique); const bool upsert = true; Status updateCollStatus = _updateCollection(opCtx, ns, coll, upsert); if (!updateCollStatus.isOK()) { return updateCollStatus; } } // Tell the primary mongod to refresh its data // TODO: Think the real fix here is for mongos to just // assume that all collections are sharded, when we get there SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( shardRegistry->getConfigServerConnectionString(), dbPrimaryShardId, primaryShardStatus.getValue()->getConnString(), NamespaceString(ns), collVersion, true); auto shardStatus = shardRegistry->getShard(opCtx, dbPrimaryShardId); if (!shardStatus.isOK()) { return shardStatus.getStatus(); } auto shard = shardStatus.getValue(); auto ssvResponse = shard->runCommandWithFixedRetryAttempts(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", ssv.toBSON(), Shard::RetryPolicy::kIdempotent); auto status = ssvResponse.isOK() ? std::move(ssvResponse.getValue().commandStatus) : std::move(ssvResponse.getStatus()); if (!status.isOK()) { warning() << "could not update initial version of " << ns << " on shard primary " << dbPrimaryShardId << causedBy(redact(status)); } logChange(opCtx, "shardCollection.end", ns, BSON("version" << collVersion.toString()), ShardingCatalogClientImpl::kMajorityWriteConcern); return Status::OK(); } StatusWith ShardingCatalogClientImpl::removeShard(OperationContext* opCtx, const ShardId& shardId) { // Check preconditions for removing the shard string name = shardId.toString(); auto countStatus = _runCountCommandOnConfig( opCtx, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name() << NE << name << ShardType::draining(true))); if (!countStatus.isOK()) { return countStatus.getStatus(); } if (countStatus.getValue() > 0) { return Status(ErrorCodes::ConflictingOperationInProgress, "Can't have more than one draining shard at a time"); } countStatus = _runCountCommandOnConfig( opCtx, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name() << NE << name)); if (!countStatus.isOK()) { return countStatus.getStatus(); } if (countStatus.getValue() == 0) { return Status(ErrorCodes::IllegalOperation, "Can't remove last shard"); } // Figure out if shard is already draining countStatus = _runCountCommandOnConfig(opCtx, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name() << name << ShardType::draining(true))); if (!countStatus.isOK()) { return countStatus.getStatus(); } auto* const shardRegistry = Grid::get(opCtx)->shardRegistry(); if (countStatus.getValue() == 0) { log() << "going to start draining shard: " << name; auto updateStatus = updateConfigDocument(opCtx, ShardType::ConfigNS, BSON(ShardType::name() << name), BSON("$set" << BSON(ShardType::draining(true))), false, ShardingCatalogClient::kMajorityWriteConcern); if (!updateStatus.isOK()) { log() << "error starting removeShard: " << name << causedBy(redact(updateStatus.getStatus())); return updateStatus.getStatus(); } shardRegistry->reload(opCtx); // Record start in changelog logChange(opCtx, "removeShard.start", "", BSON("shard" << name), ShardingCatalogClientImpl::kMajorityWriteConcern); return ShardDrainingStatus::STARTED; } // Draining has already started, now figure out how many chunks and databases are still on the // shard. countStatus = _runCountCommandOnConfig( opCtx, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(name))); if (!countStatus.isOK()) { return countStatus.getStatus(); } const long long chunkCount = countStatus.getValue(); countStatus = _runCountCommandOnConfig( opCtx, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::primary(name))); if (!countStatus.isOK()) { return countStatus.getStatus(); } const long long databaseCount = countStatus.getValue(); if (chunkCount > 0 || databaseCount > 0) { // Still more draining to do return ShardDrainingStatus::ONGOING; } // Draining is done, now finish removing the shard. log() << "going to remove shard: " << name; audit::logRemoveShard(opCtx->getClient(), name); Status status = removeConfigDocuments(opCtx, ShardType::ConfigNS, BSON(ShardType::name() << name), ShardingCatalogClient::kMajorityWriteConcern); if (!status.isOK()) { log() << "Error concluding removeShard operation on: " << name << "; err: " << status.reason(); return status; } shardConnectionPool.removeHost(name); ReplicaSetMonitor::remove(name); shardRegistry->reload(opCtx); // Record finish in changelog logChange(opCtx, "removeShard", "", BSON("shard" << name), ShardingCatalogClientImpl::kMajorityWriteConcern); return ShardDrainingStatus::COMPLETED; } StatusWith> ShardingCatalogClientImpl::getDatabase( OperationContext* opCtx, const std::string& dbName) { if (!NamespaceString::validDBName(dbName, NamespaceString::DollarInDbNameBehavior::Allow)) { return {ErrorCodes::InvalidNamespace, stream() << dbName << " is not a valid db name"}; } // The two databases that are hosted on the config server are config and admin if (dbName == "config" || dbName == "admin") { DatabaseType dbt; dbt.setName(dbName); dbt.setSharded(false); dbt.setPrimary(ShardId("config")); return repl::OpTimeWith(dbt); } auto result = _fetchDatabaseMetadata(opCtx, dbName, kConfigReadSelector); if (result == ErrorCodes::NamespaceNotFound) { // If we failed to find the database metadata on the 'nearest' config server, try again // against the primary, in case the database was recently created. result = _fetchDatabaseMetadata( opCtx, dbName, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); if (!result.isOK() && (result != ErrorCodes::NamespaceNotFound)) { return {result.getStatus().code(), str::stream() << "Could not confirm non-existence of database " << dbName << " due to " << result.getStatus().reason()}; } } return result; } StatusWith> ShardingCatalogClientImpl::_fetchDatabaseMetadata( OperationContext* opCtx, const std::string& dbName, const ReadPreferenceSetting& readPref) { dassert(dbName != "admin" && dbName != "config"); auto findStatus = _exhaustiveFindOnConfig(opCtx, readPref, repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::name(dbName)), BSONObj(), 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& docsWithOpTime = findStatus.getValue(); if (docsWithOpTime.value.empty()) { return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"}; } invariant(docsWithOpTime.value.size() == 1); auto parseStatus = DatabaseType::fromBSON(docsWithOpTime.value.front()); if (!parseStatus.isOK()) { return parseStatus.getStatus(); } return repl::OpTimeWith(parseStatus.getValue(), docsWithOpTime.opTime); } StatusWith> ShardingCatalogClientImpl::getCollection( OperationContext* opCtx, const std::string& collNs) { auto statusFind = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(CollectionType::ConfigNS), BSON(CollectionType::fullNs(collNs)), BSONObj(), 1); if (!statusFind.isOK()) { return statusFind.getStatus(); } const auto& retOpTimePair = statusFind.getValue(); const auto& retVal = retOpTimePair.value; if (retVal.empty()) { return Status(ErrorCodes::NamespaceNotFound, stream() << "collection " << collNs << " not found"); } invariant(retVal.size() == 1); auto parseStatus = CollectionType::fromBSON(retVal.front()); if (!parseStatus.isOK()) { return parseStatus.getStatus(); } auto collType = parseStatus.getValue(); if (collType.getDropped()) { return Status(ErrorCodes::NamespaceNotFound, stream() << "collection " << collNs << " was dropped"); } return repl::OpTimeWith(collType, retOpTimePair.opTime); } Status ShardingCatalogClientImpl::getCollections(OperationContext* opCtx, const std::string* dbName, std::vector* collections, OpTime* opTime) { BSONObjBuilder b; if (dbName) { invariant(!dbName->empty()); b.appendRegex(CollectionType::fullNs(), string(str::stream() << "^" << pcrecpp::RE::QuoteMeta(*dbName) << "\\.")); } auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(CollectionType::ConfigNS), b.obj(), BSONObj(), boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& docsOpTimePair = findStatus.getValue(); for (const BSONObj& obj : docsOpTimePair.value) { const auto collectionResult = CollectionType::fromBSON(obj); if (!collectionResult.isOK()) { collections->clear(); return {ErrorCodes::FailedToParse, str::stream() << "error while parsing " << CollectionType::ConfigNS << " document: " << obj << " : " << collectionResult.getStatus().toString()}; } collections->push_back(collectionResult.getValue()); } if (opTime) { *opTime = docsOpTimePair.opTime; } return Status::OK(); } Status ShardingCatalogClientImpl::dropCollection(OperationContext* opCtx, const NamespaceString& ns) { logChange(opCtx, "dropCollection.start", ns.ns(), BSONObj(), ShardingCatalogClientImpl::kMajorityWriteConcern); auto shardsStatus = getAllShards(opCtx, repl::ReadConcernLevel::kMajorityReadConcern); if (!shardsStatus.isOK()) { return shardsStatus.getStatus(); } vector allShards = std::move(shardsStatus.getValue().value); LOG(1) << "dropCollection " << ns << " started"; // Lock the collection globally so that split/migrate cannot run Seconds waitFor(DistLockManager::kDefaultLockTimeout); MONGO_FAIL_POINT_BLOCK(setDropCollDistLockWait, customWait) { const BSONObj& data = customWait.getData(); waitFor = Seconds(data["waitForSecs"].numberInt()); } auto scopedDistLock = getDistLockManager()->lock(opCtx, ns.ns(), "drop", waitFor); if (!scopedDistLock.isOK()) { return scopedDistLock.getStatus(); } LOG(1) << "dropCollection " << ns << " locked"; const auto dropCommandBSON = [opCtx, &ns] { BSONObjBuilder builder; builder.append("drop", ns.coll()); if (!opCtx->getWriteConcern().usedDefault) { builder.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); } return builder.obj(); }(); std::map errors; auto* const shardRegistry = Grid::get(opCtx)->shardRegistry(); for (const auto& shardEntry : allShards) { auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); if (!swShard.isOK()) { return swShard.getStatus(); } const auto& shard = swShard.getValue(); auto swDropResult = shard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, ns.db().toString(), dropCommandBSON, Shard::RetryPolicy::kIdempotent); if (!swDropResult.isOK()) { return {swDropResult.getStatus().code(), str::stream() << swDropResult.getStatus().reason() << " at " << shardEntry.getName()}; } auto& dropResult = swDropResult.getValue(); auto dropStatus = std::move(dropResult.commandStatus); auto wcStatus = std::move(dropResult.writeConcernStatus); if (!dropStatus.isOK() || !wcStatus.isOK()) { if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) { // Generally getting NamespaceNotFound is okay to ignore as it simply means that // the collection has already been dropped or doesn't exist on this shard. // If, however, we get NamespaceNotFound but also have a write concern error then we // can't confirm whether the fact that the namespace doesn't exist is actually // committed. Thus we must still fail on NamespaceNotFound if there is also a write // concern error. This can happen if we call drop, it succeeds but with a write // concern error, then we retry the drop. continue; } errors.emplace(shardEntry.getHost(), std::move(dropResult.response)); } } if (!errors.empty()) { StringBuilder sb; sb << "Dropping collection failed on the following hosts: "; for (auto it = errors.cbegin(); it != errors.cend(); ++it) { if (it != errors.cbegin()) { sb << ", "; } sb << it->first << ": " << it->second; } return {ErrorCodes::OperationFailed, sb.str()}; } LOG(1) << "dropCollection " << ns << " shard data deleted"; // Remove chunk data Status result = removeConfigDocuments(opCtx, ChunkType::ConfigNS, BSON(ChunkType::ns(ns.ns())), ShardingCatalogClient::kMajorityWriteConcern); if (!result.isOK()) { return result; } LOG(1) << "dropCollection " << ns << " chunk data deleted"; // Mark the collection as dropped CollectionType coll; coll.setNs(ns); coll.setDropped(true); coll.setEpoch(ChunkVersion::DROPPED().epoch()); coll.setUpdatedAt(Grid::get(opCtx)->getNetwork()->now()); const bool upsert = false; result = _updateCollection(opCtx, ns.ns(), coll, upsert); if (!result.isOK()) { return result; } LOG(1) << "dropCollection " << ns << " collection marked as dropped"; for (const auto& shardEntry : allShards) { auto swShard = shardRegistry->getShard(opCtx, shardEntry.getName()); if (!swShard.isOK()) { return swShard.getStatus(); } const auto& shard = swShard.getValue(); SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( shardRegistry->getConfigServerConnectionString(), shardEntry.getName(), fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())), ns, ChunkVersion::DROPPED(), true); auto ssvResult = shard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", ssv.toBSON(), Shard::RetryPolicy::kIdempotent); if (!ssvResult.isOK()) { return ssvResult.getStatus(); } auto ssvStatus = std::move(ssvResult.getValue().commandStatus); if (!ssvStatus.isOK()) { return ssvStatus; } auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", BSON("unsetSharding" << 1), Shard::RetryPolicy::kIdempotent); if (!unsetShardingStatus.isOK()) { return unsetShardingStatus.getStatus(); } auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus); if (!unsetShardingResult.isOK()) { return unsetShardingResult; } } LOG(1) << "dropCollection " << ns << " completed"; logChange(opCtx, "dropCollection", ns.ns(), BSONObj(), ShardingCatalogClientImpl::kMajorityWriteConcern); return Status::OK(); } StatusWith ShardingCatalogClientImpl::getGlobalSettings(OperationContext* opCtx, StringData key) { auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, repl::ReadConcernLevel::kMajorityReadConcern, kSettingsNamespace, BSON("_id" << key), BSONObj(), 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& docs = findStatus.getValue().value; if (docs.empty()) { return {ErrorCodes::NoMatchingDocument, str::stream() << "can't find settings document with key: " << key}; } invariant(docs.size() == 1); return docs.front(); } StatusWith ShardingCatalogClientImpl::getConfigVersion( OperationContext* opCtx, repl::ReadConcernLevel readConcern) { auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, kConfigReadSelector, readConcern, NamespaceString(VersionType::ConfigNS), BSONObj(), BSONObj(), boost::none /* no limit */); if (!findStatus.isOK()) { return findStatus.getStatus(); } auto queryResults = findStatus.getValue().docs; if (queryResults.size() > 1) { return {ErrorCodes::TooManyMatchingDocuments, str::stream() << "should only have 1 document in " << VersionType::ConfigNS}; } if (queryResults.empty()) { VersionType versionInfo; versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion); versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion); versionInfo.setClusterId(OID{}); return versionInfo; } BSONObj versionDoc = queryResults.front(); auto versionTypeResult = VersionType::fromBSON(versionDoc); if (!versionTypeResult.isOK()) { return {versionTypeResult.getStatus().code(), str::stream() << "Unable to parse config.version document " << versionDoc << " due to " << versionTypeResult.getStatus().reason()}; } auto validationStatus = versionTypeResult.getValue().validate(); if (!validationStatus.isOK()) { return Status(validationStatus.code(), str::stream() << "Unable to validate config.version document " << versionDoc << " due to " << validationStatus.reason()); } return versionTypeResult.getValue(); } Status ShardingCatalogClientImpl::getDatabasesForShard(OperationContext* opCtx, const ShardId& shardId, vector* dbs) { auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::primary(shardId.toString())), BSONObj(), boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } for (const BSONObj& obj : findStatus.getValue().value) { string dbName; Status status = bsonExtractStringField(obj, DatabaseType::name(), &dbName); if (!status.isOK()) { dbs->clear(); return status; } dbs->push_back(dbName); } return Status::OK(); } Status ShardingCatalogClientImpl::getChunks(OperationContext* opCtx, const BSONObj& query, const BSONObj& sort, boost::optional limit, vector* chunks, OpTime* opTime, repl::ReadConcernLevel readConcern) { invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || readConcern == repl::ReadConcernLevel::kMajorityReadConcern); chunks->clear(); // Convert boost::optional to boost::optional. auto longLimit = limit ? boost::optional(*limit) : boost::none; auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, readConcern, NamespaceString(ChunkType::ConfigNS), query, sort, longLimit); if (!findStatus.isOK()) { return {findStatus.getStatus().code(), str::stream() << "Failed to load chunks due to " << findStatus.getStatus().reason()}; } const auto& chunkDocsOpTimePair = findStatus.getValue(); for (const BSONObj& obj : chunkDocsOpTimePair.value) { auto chunkRes = ChunkType::fromConfigBSON(obj); if (!chunkRes.isOK()) { chunks->clear(); return {chunkRes.getStatus().code(), stream() << "Failed to parse chunk with id " << obj[ChunkType::name()] << " due to " << chunkRes.getStatus().reason()}; } chunks->push_back(chunkRes.getValue()); } if (opTime) { *opTime = chunkDocsOpTimePair.opTime; } return Status::OK(); } Status ShardingCatalogClientImpl::getTagsForCollection(OperationContext* opCtx, const std::string& collectionNs, std::vector* tags) { tags->clear(); auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(TagsType::ConfigNS), BSON(TagsType::ns(collectionNs)), BSON(TagsType::min() << 1), boost::none); // no limit if (!findStatus.isOK()) { return {findStatus.getStatus().code(), str::stream() << "Failed to load tags due to " << findStatus.getStatus().reason()}; } const auto& tagDocsOpTimePair = findStatus.getValue(); for (const BSONObj& obj : tagDocsOpTimePair.value) { auto tagRes = TagsType::fromBSON(obj); if (!tagRes.isOK()) { tags->clear(); return {tagRes.getStatus().code(), str::stream() << "Failed to parse tag with id " << obj[TagsType::tag()] << " due to " << tagRes.getStatus().toString()}; } tags->push_back(tagRes.getValue()); } return Status::OK(); } StatusWith>> ShardingCatalogClientImpl::getAllShards( OperationContext* opCtx, repl::ReadConcernLevel readConcern) { std::vector shards; auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, readConcern, NamespaceString(ShardType::ConfigNS), BSONObj(), // no query filter BSONObj(), // no sort boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } for (const BSONObj& doc : findStatus.getValue().value) { auto shardRes = ShardType::fromBSON(doc); if (!shardRes.isOK()) { return {shardRes.getStatus().code(), stream() << "Failed to parse shard document " << doc << " due to " << shardRes.getStatus().reason()}; } Status validateStatus = shardRes.getValue().validate(); if (!validateStatus.isOK()) { return {validateStatus.code(), stream() << "Failed to validate shard document " << doc << " due to " << validateStatus.reason()}; } shards.push_back(shardRes.getValue()); } return repl::OpTimeWith>{std::move(shards), findStatus.getValue().opTime}; } bool ShardingCatalogClientImpl::runUserManagementWriteCommand(OperationContext* opCtx, const std::string& commandName, const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { BSONObj cmdToRun = cmdObj; { // Make sure that if the command has a write concern that it is w:1 or w:majority, and // convert w:1 or no write concern to w:majority before sending. WriteConcernOptions writeConcern; writeConcern.reset(); BSONElement writeConcernElement = cmdObj[WriteConcernOptions::kWriteConcernField]; bool initialCmdHadWriteConcern = !writeConcernElement.eoo(); if (initialCmdHadWriteConcern) { Status status = writeConcern.parse(writeConcernElement.Obj()); if (!status.isOK()) { return Command::appendCommandStatus(*result, status); } if (!(writeConcern.wNumNodes == 1 || writeConcern.wMode == WriteConcernOptions::kMajority)) { return Command::appendCommandStatus( *result, {ErrorCodes::InvalidOptions, str::stream() << "Invalid replication write concern. User management write " "commands may only use w:1 or w:'majority', got: " << writeConcern.toBSON()}); } } writeConcern.wMode = WriteConcernOptions::kMajority; writeConcern.wNumNodes = 0; BSONObjBuilder modifiedCmd; if (!initialCmdHadWriteConcern) { modifiedCmd.appendElements(cmdObj); } else { BSONObjIterator cmdObjIter(cmdObj); while (cmdObjIter.more()) { BSONElement e = cmdObjIter.next(); if (WriteConcernOptions::kWriteConcernField == e.fieldName()) { continue; } modifiedCmd.append(e); } } modifiedCmd.append(WriteConcernOptions::kWriteConcernField, writeConcern.toBSON()); cmdToRun = modifiedCmd.obj(); } auto response = Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, dbname, cmdToRun, Shard::kDefaultConfigCommandTimeout, Shard::RetryPolicy::kNotIdempotent); if (!response.isOK()) { return Command::appendCommandStatus(*result, response.getStatus()); } if (!response.getValue().commandStatus.isOK()) { return Command::appendCommandStatus(*result, response.getValue().commandStatus); } if (!response.getValue().writeConcernStatus.isOK()) { return Command::appendCommandStatus(*result, response.getValue().writeConcernStatus); } Command::filterCommandReplyForPassthrough(response.getValue().response, result); return true; } bool ShardingCatalogClientImpl::runReadCommandForTest(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { BSONObjBuilder cmdBuilder; cmdBuilder.appendElements(cmdObj); _appendReadConcern(&cmdBuilder); auto resultStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( opCtx, kConfigReadSelector, dbname, cmdBuilder.done(), Shard::RetryPolicy::kIdempotent); if (resultStatus.isOK()) { result->appendElements(resultStatus.getValue().response); return resultStatus.getValue().commandStatus.isOK(); } return Command::appendCommandStatus(*result, resultStatus.getStatus()); } bool ShardingCatalogClientImpl::runUserManagementReadCommand(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { auto resultStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( opCtx, kConfigPrimaryPreferredSelector, dbname, cmdObj, Shard::kDefaultConfigCommandTimeout, Shard::RetryPolicy::kIdempotent); if (resultStatus.isOK()) { Command::filterCommandReplyForPassthrough(resultStatus.getValue().response, result); return resultStatus.getValue().commandStatus.isOK(); } return Command::appendCommandStatus(*result, resultStatus.getStatus()); } Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* opCtx, const BSONArray& updateOps, const BSONArray& preCondition, const std::string& nss, const ChunkVersion& lastChunkVersion, const WriteConcernOptions& writeConcern, repl::ReadConcernLevel readConcern) { invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || (readConcern == repl::ReadConcernLevel::kMajorityReadConcern && writeConcern.wMode == WriteConcernOptions::kMajority)); BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition << WriteConcernOptions::kWriteConcernField << writeConcern.toBSON()); auto response = Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "config", cmd, Shard::RetryPolicy::kIdempotent); if (!response.isOK()) { return response.getStatus(); } Status status = response.getValue().commandStatus.isOK() ? std::move(response.getValue().writeConcernStatus) : std::move(response.getValue().commandStatus); // TODO (Dianna) This fail point needs to be reexamined when CommitChunkMigration is in: // migrations will no longer be able to exercise it, so split or merge will need to do so. // SERVER-22659. if (MONGO_FAIL_POINT(failApplyChunkOps)) { status = Status(ErrorCodes::InternalError, "Failpoint 'failApplyChunkOps' generated error"); } if (!status.isOK()) { string errMsg; // This could be a blip in the network connectivity. Check if the commit request made it. // // If all the updates were successfully written to the chunks collection, the last // document in the list of updates should be returned from a query to the chunks // collection. The last chunk can be identified by namespace and version number. warning() << "chunk operation commit failed and metadata will be revalidated" << causedBy(redact(status)); // Look for the chunk in this shard whose version got bumped. We assume that if that // mod made it to the config server, then applyOps was successful. std::vector newestChunk; BSONObjBuilder query; lastChunkVersion.addToBSON(query, ChunkType::DEPRECATED_lastmod()); query.append(ChunkType::ns(), nss); Status chunkStatus = getChunks(opCtx, query.obj(), BSONObj(), 1, &newestChunk, nullptr, readConcern); if (!chunkStatus.isOK()) { errMsg = str::stream() << "getChunks function failed, unable to validate chunk " << "operation metadata: " << chunkStatus.toString() << ". applyChunkOpsDeprecated failed to get confirmation " << "of commit. Unable to save chunk ops. Command: " << cmd << ". Result: " << response.getValue().response; } else if (!newestChunk.empty()) { invariant(newestChunk.size() == 1); return Status::OK(); } else { errMsg = str::stream() << "chunk operation commit failed: version " << lastChunkVersion.toString() << " doesn't exist in namespace: " << nss << ". Unable to save chunk ops. Command: " << cmd << ". Result: " << response.getValue().response; } return Status(status.code(), errMsg); } return Status::OK(); } DistLockManager* ShardingCatalogClientImpl::getDistLockManager() { invariant(_distLockManager); return _distLockManager.get(); } void ShardingCatalogClientImpl::writeConfigServerDirect(OperationContext* opCtx, const BatchedCommandRequest& batchRequest, BatchedCommandResponse* batchResponse) { // We only support batch sizes of one for config writes if (batchRequest.sizeWriteOps() != 1) { toBatchError(Status(ErrorCodes::InvalidOptions, str::stream() << "Writes to config servers must have batch size of 1, " << "found " << batchRequest.sizeWriteOps()), batchResponse); return; } auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); *batchResponse = configShard->runBatchWriteCommand(opCtx, Shard::kDefaultConfigCommandTimeout, batchRequest, Shard::RetryPolicy::kNotIdempotent); } Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* opCtx, const std::string& ns, const BSONObj& doc, const WriteConcernOptions& writeConcern) { const NamespaceString nss(ns); invariant(nss.db() == "config" || nss.db() == "admin"); const BSONElement idField = doc.getField("_id"); invariant(!idField.eoo()); auto insert(stdx::make_unique()); insert->addToDocuments(doc); BatchedCommandRequest request(insert.release()); request.setNS(nss); request.setWriteConcern(writeConcern.toBSON()); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); for (int retry = 1; retry <= kMaxWriteRetry; retry++) { auto response = configShard->runBatchWriteCommand( opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kNoRetry); Status status = response.toStatus(); if (retry < kMaxWriteRetry && configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) { // Pretend like the operation is idempotent because we're handling DuplicateKey errors // specially continue; } // If we get DuplicateKey error on the first attempt to insert, this definitively means that // we are trying to insert the same entry a second time, so error out. If it happens on a // retry attempt though, it is not clear whether we are actually inserting a duplicate key // or it is because we failed to wait for write concern on the first attempt. In order to // differentiate, fetch the entry and check. if (retry > 1 && status == ErrorCodes::DuplicateKey) { LOG(1) << "Insert retry failed because of duplicate key error, rechecking."; auto fetchDuplicate = _exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kMajorityReadConcern, nss, idField.wrap(), BSONObj(), boost::none); if (!fetchDuplicate.isOK()) { return fetchDuplicate.getStatus(); } auto existingDocs = fetchDuplicate.getValue().value; if (existingDocs.empty()) { return {ErrorCodes::DuplicateKey, stream() << "DuplicateKey error was returned after a retry attempt, but no " "documents were found. This means a concurrent change occurred " "together with the retries. Original error was " << status.toString()}; } invariant(existingDocs.size() == 1); BSONObj existing = std::move(existingDocs.front()); if (existing.woCompare(doc) == 0) { // Documents match, so treat the operation as success return Status::OK(); } } return status; } MONGO_UNREACHABLE; } StatusWith ShardingCatalogClientImpl::updateConfigDocument( OperationContext* opCtx, const string& ns, const BSONObj& query, const BSONObj& update, bool upsert, const WriteConcernOptions& writeConcern) { const NamespaceString nss(ns); invariant(nss.db() == "config"); const BSONElement idField = query.getField("_id"); invariant(!idField.eoo()); unique_ptr updateDoc(new BatchedUpdateDocument()); updateDoc->setQuery(query); updateDoc->setUpdateExpr(update); updateDoc->setUpsert(upsert); updateDoc->setMulti(false); unique_ptr updateRequest(new BatchedUpdateRequest()); updateRequest->addToUpdates(updateDoc.release()); BatchedCommandRequest request(updateRequest.release()); request.setNS(nss); request.setWriteConcern(writeConcern.toBSON()); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto response = configShard->runBatchWriteCommand( opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent); Status status = response.toStatus(); if (!status.isOK()) { return status; } const auto nSelected = response.getN(); invariant(nSelected == 0 || nSelected == 1); return (nSelected == 1); } Status ShardingCatalogClientImpl::removeConfigDocuments(OperationContext* opCtx, const string& ns, const BSONObj& query, const WriteConcernOptions& writeConcern) { const NamespaceString nss(ns); invariant(nss.db() == "config"); auto deleteDoc(stdx::make_unique()); deleteDoc->setQuery(query); deleteDoc->setLimit(0); auto deleteRequest(stdx::make_unique()); deleteRequest->addToDeletes(deleteDoc.release()); BatchedCommandRequest request(deleteRequest.release()); request.setNS(nss); request.setWriteConcern(writeConcern.toBSON()); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto response = configShard->runBatchWriteCommand( opCtx, Shard::kDefaultConfigCommandTimeout, request, Shard::RetryPolicy::kIdempotent); return response.toStatus(); } Status ShardingCatalogClientImpl::_checkDbDoesNotExist(OperationContext* opCtx, const string& dbName, DatabaseType* db) { BSONObjBuilder queryBuilder; queryBuilder.appendRegex( DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i"); auto findStatus = _exhaustiveFindOnConfig(opCtx, kConfigReadSelector, repl::ReadConcernLevel::kMajorityReadConcern, NamespaceString(DatabaseType::ConfigNS), queryBuilder.obj(), BSONObj(), 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& docs = findStatus.getValue().value; if (docs.empty()) { return Status::OK(); } BSONObj dbObj = docs.front(); std::string actualDbName = dbObj[DatabaseType::name()].String(); if (actualDbName == dbName) { if (db) { auto parseDBStatus = DatabaseType::fromBSON(dbObj); if (!parseDBStatus.isOK()) { return parseDBStatus.getStatus(); } *db = parseDBStatus.getValue(); } return Status(ErrorCodes::NamespaceExists, str::stream() << "database " << dbName << " already exists"); } return Status(ErrorCodes::DatabaseDifferCase, str::stream() << "can't have 2 databases that just differ on case " << " have: " << actualDbName << " want to add: " << dbName); } Status ShardingCatalogClientImpl::_createCappedConfigCollection( OperationContext* opCtx, StringData collName, int cappedSize, const WriteConcernOptions& writeConcern) { BSONObj createCmd = BSON("create" << collName << "capped" << true << "size" << cappedSize << WriteConcernOptions::kWriteConcernField << writeConcern.toBSON()); auto result = Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "config", createCmd, Shard::kDefaultConfigCommandTimeout, Shard::RetryPolicy::kIdempotent); if (!result.isOK()) { return result.getStatus(); } if (!result.getValue().commandStatus.isOK()) { if (result.getValue().commandStatus == ErrorCodes::NamespaceExists) { if (result.getValue().writeConcernStatus.isOK()) { return Status::OK(); } else { return result.getValue().writeConcernStatus; } } else { return result.getValue().commandStatus; } } return result.getValue().writeConcernStatus; } StatusWith ShardingCatalogClientImpl::_runCountCommandOnConfig(OperationContext* opCtx, const NamespaceString& ns, BSONObj query) { BSONObjBuilder countBuilder; countBuilder.append("count", ns.coll()); countBuilder.append("query", query); _appendReadConcern(&countBuilder); auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto resultStatus = configShard->runCommandWithFixedRetryAttempts(opCtx, kConfigReadSelector, ns.db().toString(), countBuilder.done(), Shard::kDefaultConfigCommandTimeout, Shard::RetryPolicy::kIdempotent); if (!resultStatus.isOK()) { return resultStatus.getStatus(); } if (!resultStatus.getValue().commandStatus.isOK()) { return resultStatus.getValue().commandStatus; } auto responseObj = std::move(resultStatus.getValue().response); long long result; auto status = bsonExtractIntegerField(responseObj, "n", &result); if (!status.isOK()) { return status; } return result; } StatusWith>> ShardingCatalogClientImpl::_exhaustiveFindOnConfig( OperationContext* opCtx, const ReadPreferenceSetting& readPref, repl::ReadConcernLevel readConcern, const NamespaceString& nss, const BSONObj& query, const BSONObj& sort, boost::optional limit) { auto response = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, readPref, readConcern, nss, query, sort, limit); if (!response.isOK()) { return response.getStatus(); } return repl::OpTimeWith>(std::move(response.getValue().docs), response.getValue().opTime); } void ShardingCatalogClientImpl::_appendReadConcern(BSONObjBuilder* builder) { repl::ReadConcernArgs readConcern(grid.configOpTime(), repl::ReadConcernLevel::kMajorityReadConcern); readConcern.appendInfo(builder); } Status ShardingCatalogClientImpl::appendInfoForConfigServerDatabases( OperationContext* opCtx, const BSONObj& listDatabasesCmd, BSONArrayBuilder* builder) { auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); auto resultStatus = configShard->runCommandWithFixedRetryAttempts(opCtx, kConfigPrimaryPreferredSelector, "admin", listDatabasesCmd, Shard::RetryPolicy::kIdempotent); if (!resultStatus.isOK()) { return resultStatus.getStatus(); } if (!resultStatus.getValue().commandStatus.isOK()) { return resultStatus.getValue().commandStatus; } auto listDBResponse = std::move(resultStatus.getValue().response); BSONElement dbListArray; auto dbListStatus = bsonExtractTypedField(listDBResponse, "databases", Array, &dbListArray); if (!dbListStatus.isOK()) { return dbListStatus; } BSONObjIterator iter(dbListArray.Obj()); while (iter.more()) { auto dbEntry = iter.next().Obj(); string name; auto parseStatus = bsonExtractStringField(dbEntry, "name", &name); if (!parseStatus.isOK()) { return parseStatus; } if (name == "config" || name == "admin") { builder->append(dbEntry); } } return Status::OK(); } StatusWith> ShardingCatalogClientImpl::getNewKeys( OperationContext* opCtx, StringData purpose, const LogicalTime& newerThanThis, repl::ReadConcernLevel readConcernLevel) { auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard(); BSONObjBuilder queryBuilder; queryBuilder.append("purpose", purpose); queryBuilder.append("expiresAt", BSON("$gt" << newerThanThis.asTimestamp())); auto findStatus = config->exhaustiveFindOnConfig(opCtx, kConfigReadSelector, readConcernLevel, NamespaceString(KeysCollectionDocument::ConfigNS), queryBuilder.obj(), BSON("expiresAt" << 1), boost::none); if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& keyDocs = findStatus.getValue().docs; std::vector keys; for (auto&& keyDoc : keyDocs) { auto parseStatus = KeysCollectionDocument::fromBSON(keyDoc); if (!parseStatus.isOK()) { return parseStatus.getStatus(); } keys.push_back(std::move(parseStatus.getValue())); } return keys; } } // namespace mongo