diff options
Diffstat (limited to 'src/mongo/s/catalog/sharding_catalog_client_impl.cpp')
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_client_impl.cpp | 1737 |
1 files changed, 1737 insertions, 0 deletions
diff --git a/src/mongo/s/catalog/sharding_catalog_client_impl.cpp b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp new file mode 100644 index 00000000000..6fd07f23fcb --- /dev/null +++ b/src/mongo/s/catalog/sharding_catalog_client_impl.cpp @@ -0,0 +1,1737 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/sharding_catalog_client_impl.h" + +#include <iomanip> +#include <pcrecpp.h> + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/db/audit.h" +#include "mongo/db/client.h" +#include "mongo/db/commands.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/s/type_shard_identity.h" +#include "mongo/executor/network_interface.h" +#include "mongo/executor/task_executor.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/s/catalog/config_server_version.h" +#include "mongo/s/catalog/dist_lock_manager.h" +#include "mongo/s/catalog/type_changelog.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_config_version.h" +#include "mongo/s/catalog/type_database.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog/type_tags.h" +#include "mongo/s/chunk_manager.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_connection.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/config.h" +#include "mongo/s/grid.h" +#include "mongo/s/set_shard_version_request.h" +#include "mongo/s/shard_key_pattern.h" +#include "mongo/s/shard_util.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +MONGO_FP_DECLARE(failApplyChunkOps); +MONGO_FP_DECLARE(setDropCollDistLockWait); + +using repl::OpTime; +using std::set; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::vector; +using str::stream; + +namespace { + +const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); +const ReadPreferenceSetting kConfigPrimaryPreferredSelector(ReadPreference::PrimaryPreferred, + TagSet{}); +const int kMaxReadRetry = 3; +const int kMaxWriteRetry = 3; + +const std::string kActionLogCollectionName("actionlog"); +const int kActionLogCollectionSizeMB = 2 * 1024 * 1024; + +const std::string kChangeLogCollectionName("changelog"); +const int kChangeLogCollectionSizeMB = 10 * 1024 * 1024; + +const NamespaceString kSettingsNamespace("config", "settings"); + +void toBatchError(const Status& status, BatchedCommandResponse* response) { + response->clear(); + response->setErrCode(status.code()); + response->setErrMessage(status.reason()); + response->setOk(false); +} + +} // namespace + +ShardingCatalogClientImpl::ShardingCatalogClientImpl( + std::unique_ptr<DistLockManager> distLockManager) + : _distLockManager(std::move(distLockManager)) {} + +ShardingCatalogClientImpl::~ShardingCatalogClientImpl() = default; + +Status ShardingCatalogClientImpl::startup() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_started) { + return Status::OK(); + } + _started = true; + _distLockManager->startUp(); + return Status::OK(); +} + +void ShardingCatalogClientImpl::shutDown(OperationContext* txn) { + LOG(1) << "ShardingCatalogClientImpl::shutDown() called."; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _inShutdown = true; + } + + invariant(_distLockManager); + _distLockManager->shutDown(txn); +} + +Status ShardingCatalogClientImpl::updateCollection(OperationContext* txn, + const std::string& collNs, + const CollectionType& coll) { + fassert(28634, coll.validate()); + + auto status = updateConfigDocument(txn, + CollectionType::ConfigNS, + BSON(CollectionType::fullNs(collNs)), + coll.toBSON(), + true, + ShardingCatalogClient::kMajorityWriteConcern); + if (!status.isOK()) { + return Status(status.getStatus().code(), + str::stream() << "collection metadata write failed" + << causedBy(status.getStatus())); + } + + return Status::OK(); +} + +Status ShardingCatalogClientImpl::updateDatabase(OperationContext* txn, + const std::string& dbName, + const DatabaseType& db) { + fassert(28616, db.validate()); + + auto status = updateConfigDocument(txn, + DatabaseType::ConfigNS, + BSON(DatabaseType::name(dbName)), + db.toBSON(), + true, + ShardingCatalogClient::kMajorityWriteConcern); + if (!status.isOK()) { + return Status(status.getStatus().code(), + str::stream() << "database metadata write failed" + << causedBy(status.getStatus())); + } + + return Status::OK(); +} + +Status ShardingCatalogClientImpl::createDatabase(OperationContext* txn, const std::string& dbName) { + invariant(nsIsDbOnly(dbName)); + + // The admin and config databases should never be explicitly created. They "just exist", + // i.e. getDatabase will always return an entry for them. + invariant(dbName != "admin"); + invariant(dbName != "config"); + + // Lock the database globally to prevent conflicts with simultaneous database creation. + auto scopedDistLock = getDistLockManager()->lock( + txn, dbName, "createDatabase", DistLockManager::kDefaultLockTimeout); + if (!scopedDistLock.isOK()) { + return scopedDistLock.getStatus(); + } + + // check for case sensitivity violations + Status status = _checkDbDoesNotExist(txn, dbName, nullptr); + if (!status.isOK()) { + return status; + } + + // Database does not exist, pick a shard and create a new entry + auto newShardIdStatus = _selectShardForNewDatabase(txn, grid.shardRegistry()); + if (!newShardIdStatus.isOK()) { + return newShardIdStatus.getStatus(); + } + + const ShardId& newShardId = newShardIdStatus.getValue(); + + log() << "Placing [" << dbName << "] on: " << newShardId; + + DatabaseType db; + db.setName(dbName); + db.setPrimary(newShardId); + db.setSharded(false); + + status = insertConfigDocument( + txn, DatabaseType::ConfigNS, db.toBSON(), ShardingCatalogClient::kMajorityWriteConcern); + if (status.code() == ErrorCodes::DuplicateKey) { + return Status(ErrorCodes::NamespaceExists, "database " + dbName + " already exists"); + } + + return status; +} + +Status ShardingCatalogClientImpl::logAction(OperationContext* txn, + const std::string& what, + const std::string& ns, + const BSONObj& detail) { + if (_actionLogCollectionCreated.load() == 0) { + Status result = _createCappedConfigCollection(txn, + kActionLogCollectionName, + kActionLogCollectionSizeMB, + ShardingCatalogClient::kMajorityWriteConcern); + if (result.isOK()) { + _actionLogCollectionCreated.store(1); + } else { + log() << "couldn't create config.actionlog collection:" << causedBy(result); + return result; + } + } + + return _log(txn, + kActionLogCollectionName, + what, + ns, + detail, + ShardingCatalogClient::kMajorityWriteConcern); +} + +Status ShardingCatalogClientImpl::logChange(OperationContext* txn, + const std::string& what, + const std::string& ns, + const BSONObj& detail, + const WriteConcernOptions& writeConcern) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + writeConcern.wMode == WriteConcernOptions::kMajority); + if (_changeLogCollectionCreated.load() == 0) { + Status result = _createCappedConfigCollection( + txn, kChangeLogCollectionName, kChangeLogCollectionSizeMB, writeConcern); + if (result.isOK()) { + _changeLogCollectionCreated.store(1); + } else { + log() << "couldn't create config.changelog collection:" << causedBy(result); + return result; + } + } + + return _log(txn, kChangeLogCollectionName, what, ns, detail, writeConcern); +} + +// static +StatusWith<ShardId> ShardingCatalogClientImpl::_selectShardForNewDatabase( + OperationContext* txn, ShardRegistry* shardRegistry) { + vector<ShardId> allShardIds; + + shardRegistry->getAllShardIds(&allShardIds); + if (allShardIds.empty()) { + shardRegistry->reload(txn); + shardRegistry->getAllShardIds(&allShardIds); + + if (allShardIds.empty()) { + return Status(ErrorCodes::ShardNotFound, "No shards found"); + } + } + + ShardId candidateShardId = allShardIds[0]; + + auto candidateSizeStatus = shardutil::retrieveTotalShardSize(txn, candidateShardId); + if (!candidateSizeStatus.isOK()) { + return candidateSizeStatus.getStatus(); + } + + for (size_t i = 1; i < allShardIds.size(); i++) { + const ShardId shardId = allShardIds[i]; + + const auto sizeStatus = shardutil::retrieveTotalShardSize(txn, shardId); + if (!sizeStatus.isOK()) { + return sizeStatus.getStatus(); + } + + if (sizeStatus.getValue() < candidateSizeStatus.getValue()) { + candidateSizeStatus = sizeStatus; + candidateShardId = shardId; + } + } + + return candidateShardId; +} + +Status ShardingCatalogClientImpl::enableSharding(OperationContext* txn, const std::string& dbName) { + invariant(nsIsDbOnly(dbName)); + + DatabaseType db; + + // Lock the database globally to prevent conflicts with simultaneous database + // creation/modification. + auto scopedDistLock = getDistLockManager()->lock( + txn, dbName, "enableSharding", DistLockManager::kDefaultLockTimeout); + if (!scopedDistLock.isOK()) { + return scopedDistLock.getStatus(); + } + + // Check for case sensitivity violations + Status status = _checkDbDoesNotExist(txn, dbName, &db); + if (status.isOK()) { + // Database does not exist, create a new entry + auto newShardIdStatus = _selectShardForNewDatabase(txn, grid.shardRegistry()); + if (!newShardIdStatus.isOK()) { + return newShardIdStatus.getStatus(); + } + + const ShardId& newShardId = newShardIdStatus.getValue(); + + log() << "Placing [" << dbName << "] on: " << newShardId; + + db.setName(dbName); + db.setPrimary(newShardId); + db.setSharded(true); + } else if (status.code() == ErrorCodes::NamespaceExists) { + if (db.getSharded()) { + return Status(ErrorCodes::AlreadyInitialized, + str::stream() << "sharding already enabled for database " << dbName); + } + + // Database exists, so just update it + db.setSharded(true); + } else { + return status; + } + + log() << "Enabling sharding for database [" << dbName << "] in config db"; + + return updateDatabase(txn, dbName, db); +} + +Status ShardingCatalogClientImpl::_log(OperationContext* txn, + const StringData& logCollName, + const std::string& what, + const std::string& operationNS, + const BSONObj& detail, + const WriteConcernOptions& writeConcern) { + Date_t now = Grid::get(txn)->getNetwork()->now(); + const std::string hostName = Grid::get(txn)->getNetwork()->getHostName(); + const string changeId = str::stream() << hostName << "-" << now.toString() << "-" << OID::gen(); + + ChangeLogType changeLog; + changeLog.setChangeId(changeId); + changeLog.setServer(hostName); + changeLog.setClientAddr(txn->getClient()->clientAddress(true)); + changeLog.setTime(now); + changeLog.setNS(operationNS); + changeLog.setWhat(what); + changeLog.setDetails(detail); + + BSONObj changeLogBSON = changeLog.toBSON(); + log() << "about to log metadata event into " << logCollName << ": " << redact(changeLogBSON); + + const NamespaceString nss("config", logCollName); + Status result = insertConfigDocument(txn, nss.ns(), changeLogBSON, writeConcern); + + if (!result.isOK()) { + warning() << "Error encountered while logging config change with ID [" << changeId + << "] into collection " << logCollName << ": " << redact(result); + } + + return result; +} + +Status ShardingCatalogClientImpl::shardCollection(OperationContext* txn, + const string& ns, + const ShardKeyPattern& fieldsAndOrder, + const BSONObj& defaultCollation, + bool unique, + const vector<BSONObj>& initPoints, + const set<ShardId>& initShardIds) { + // Lock the collection globally so that no other mongos can try to shard or drop the collection + // at the same time. + auto scopedDistLock = getDistLockManager()->lock( + txn, ns, "shardCollection", DistLockManager::kDefaultLockTimeout); + if (!scopedDistLock.isOK()) { + return scopedDistLock.getStatus(); + } + + auto getDBStatus = getDatabase(txn, nsToDatabase(ns)); + if (!getDBStatus.isOK()) { + return getDBStatus.getStatus(); + } + + ShardId dbPrimaryShardId = getDBStatus.getValue().value.getPrimary(); + const auto primaryShardStatus = grid.shardRegistry()->getShard(txn, dbPrimaryShardId); + if (!primaryShardStatus.isOK()) { + return primaryShardStatus.getStatus(); + } + + { + // In 3.0 and prior we include this extra safety check that the collection is not getting + // sharded concurrently by two different mongos instances. It is not 100%-proof, but it + // reduces the chance that two invocations of shard collection will step on each other's + // toes. Now we take the distributed lock so going forward this check won't be necessary + // but we leave it around for compatibility with other mongoses from 3.0. + // TODO(spencer): Remove this after 3.2 ships. + auto countStatus = _runCountCommandOnConfig( + txn, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::ns(ns))); + if (!countStatus.isOK()) { + return countStatus.getStatus(); + } + if (countStatus.getValue() > 0) { + return Status(ErrorCodes::AlreadyInitialized, + str::stream() << "collection " << ns << " already sharded with " + << countStatus.getValue() + << " chunks."); + } + } + + // Record start in changelog + { + BSONObjBuilder collectionDetail; + collectionDetail.append("shardKey", fieldsAndOrder.toBSON()); + collectionDetail.append("collection", ns); + collectionDetail.append("primary", primaryShardStatus.getValue()->toString()); + + { + BSONArrayBuilder initialShards(collectionDetail.subarrayStart("initShards")); + for (const ShardId& shardId : initShardIds) { + initialShards.append(shardId.toString()); + } + } + + collectionDetail.append("numChunks", static_cast<int>(initPoints.size() + 1)); + + logChange(txn, + "shardCollection.start", + ns, + collectionDetail.obj(), + ShardingCatalogClientImpl::kMajorityWriteConcern); + } + + // Construct the collection default collator. + std::unique_ptr<CollatorInterface> defaultCollator; + if (!defaultCollation.isEmpty()) { + auto statusWithCollator = + CollatorFactoryInterface::get(txn->getServiceContext())->makeFromBSON(defaultCollation); + if (!statusWithCollator.isOK()) { + return statusWithCollator.getStatus(); + } + defaultCollator = std::move(statusWithCollator.getValue()); + } + + shared_ptr<ChunkManager> manager( + new ChunkManager(ns, fieldsAndOrder, std::move(defaultCollator), unique)); + Status createFirstChunksStatus = + manager->createFirstChunks(txn, dbPrimaryShardId, &initPoints, &initShardIds); + if (!createFirstChunksStatus.isOK()) { + return createFirstChunksStatus; + } + manager->loadExistingRanges(txn, nullptr); + + CollectionInfo collInfo; + collInfo.useChunkManager(manager); + collInfo.save(txn, ns); + + // Tell the primary mongod to refresh its data + // TODO: Think the real fix here is for mongos to just + // assume that all collections are sharded, when we get there + SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( + grid.shardRegistry()->getConfigServerConnectionString(), + dbPrimaryShardId, + primaryShardStatus.getValue()->getConnString(), + NamespaceString(ns), + manager->getVersion(), + true); + + auto shardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, dbPrimaryShardId); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); + } + auto shard = shardStatus.getValue(); + + auto ssvResponse = + shard->runCommandWithFixedRetryAttempts(txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + ssv.toBSON(), + Shard::RetryPolicy::kIdempotent); + auto status = ssvResponse.isOK() ? std::move(ssvResponse.getValue().commandStatus) + : std::move(ssvResponse.getStatus()); + if (!status.isOK()) { + warning() << "could not update initial version of " << ns << " on shard primary " + << dbPrimaryShardId << causedBy(status); + } + + logChange(txn, + "shardCollection.end", + ns, + BSON("version" << manager->getVersion().toString()), + ShardingCatalogClientImpl::kMajorityWriteConcern); + + return Status::OK(); +} + +StatusWith<ShardDrainingStatus> ShardingCatalogClientImpl::removeShard(OperationContext* txn, + const ShardId& shardId) { + // Check preconditions for removing the shard + string name = shardId.toString(); + auto countStatus = _runCountCommandOnConfig( + txn, + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::name() << NE << name << ShardType::draining(true))); + if (!countStatus.isOK()) { + return countStatus.getStatus(); + } + if (countStatus.getValue() > 0) { + return Status(ErrorCodes::ConflictingOperationInProgress, + "Can't have more than one draining shard at a time"); + } + + countStatus = _runCountCommandOnConfig( + txn, NamespaceString(ShardType::ConfigNS), BSON(ShardType::name() << NE << name)); + if (!countStatus.isOK()) { + return countStatus.getStatus(); + } + if (countStatus.getValue() == 0) { + return Status(ErrorCodes::IllegalOperation, "Can't remove last shard"); + } + + // Figure out if shard is already draining + countStatus = + _runCountCommandOnConfig(txn, + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::name() << name << ShardType::draining(true))); + if (!countStatus.isOK()) { + return countStatus.getStatus(); + } + + if (countStatus.getValue() == 0) { + log() << "going to start draining shard: " << name; + + auto updateStatus = updateConfigDocument(txn, + ShardType::ConfigNS, + BSON(ShardType::name() << name), + BSON("$set" << BSON(ShardType::draining(true))), + false, + ShardingCatalogClient::kMajorityWriteConcern); + if (!updateStatus.isOK()) { + log() << "error starting removeShard: " << name << causedBy(updateStatus.getStatus()); + return updateStatus.getStatus(); + } + + grid.shardRegistry()->reload(txn); + + // Record start in changelog + logChange(txn, + "removeShard.start", + "", + BSON("shard" << name), + ShardingCatalogClientImpl::kMajorityWriteConcern); + return ShardDrainingStatus::STARTED; + } + + // Draining has already started, now figure out how many chunks and databases are still on the + // shard. + countStatus = _runCountCommandOnConfig( + txn, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::shard(name))); + if (!countStatus.isOK()) { + return countStatus.getStatus(); + } + const long long chunkCount = countStatus.getValue(); + + countStatus = _runCountCommandOnConfig( + txn, NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::primary(name))); + if (!countStatus.isOK()) { + return countStatus.getStatus(); + } + const long long databaseCount = countStatus.getValue(); + + if (chunkCount > 0 || databaseCount > 0) { + // Still more draining to do + return ShardDrainingStatus::ONGOING; + } + + // Draining is done, now finish removing the shard. + log() << "going to remove shard: " << name; + audit::logRemoveShard(txn->getClient(), name); + + Status status = removeConfigDocuments(txn, + ShardType::ConfigNS, + BSON(ShardType::name() << name), + ShardingCatalogClient::kMajorityWriteConcern); + if (!status.isOK()) { + log() << "Error concluding removeShard operation on: " << name + << "; err: " << status.reason(); + return status; + } + + shardConnectionPool.removeHost(name); + ReplicaSetMonitor::remove(name); + + grid.shardRegistry()->reload(txn); + + // Record finish in changelog + logChange(txn, + "removeShard", + "", + BSON("shard" << name), + ShardingCatalogClientImpl::kMajorityWriteConcern); + + return ShardDrainingStatus::COMPLETED; +} + +StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::getDatabase( + OperationContext* txn, const std::string& dbName) { + if (!NamespaceString::validDBName(dbName, NamespaceString::DollarInDbNameBehavior::Allow)) { + return {ErrorCodes::InvalidNamespace, stream() << dbName << " is not a valid db name"}; + } + + // The two databases that are hosted on the config server are config and admin + if (dbName == "config" || dbName == "admin") { + DatabaseType dbt; + dbt.setName(dbName); + dbt.setSharded(false); + dbt.setPrimary(ShardId("config")); + + return repl::OpTimeWith<DatabaseType>(dbt); + } + + auto result = _fetchDatabaseMetadata(txn, dbName, kConfigReadSelector); + if (result == ErrorCodes::NamespaceNotFound) { + // If we failed to find the database metadata on the 'nearest' config server, try again + // against the primary, in case the database was recently created. + result = + _fetchDatabaseMetadata(txn, dbName, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); + if (!result.isOK() && (result != ErrorCodes::NamespaceNotFound)) { + return Status(result.getStatus().code(), + str::stream() << "Could not confirm non-existence of database \"" + << dbName + << "\" due to inability to query the config server primary" + << causedBy(result.getStatus())); + } + } + + return result; +} + +StatusWith<repl::OpTimeWith<DatabaseType>> ShardingCatalogClientImpl::_fetchDatabaseMetadata( + OperationContext* txn, const std::string& dbName, const ReadPreferenceSetting& readPref) { + dassert(dbName != "admin" && dbName != "config"); + + auto findStatus = _exhaustiveFindOnConfig(txn, + readPref, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::name(dbName)), + BSONObj(), + 1); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& docsWithOpTime = findStatus.getValue(); + if (docsWithOpTime.value.empty()) { + return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"}; + } + + invariant(docsWithOpTime.value.size() == 1); + + auto parseStatus = DatabaseType::fromBSON(docsWithOpTime.value.front()); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + return repl::OpTimeWith<DatabaseType>(parseStatus.getValue(), docsWithOpTime.opTime); +} + +StatusWith<repl::OpTimeWith<CollectionType>> ShardingCatalogClientImpl::getCollection( + OperationContext* txn, const std::string& collNs) { + auto statusFind = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(CollectionType::ConfigNS), + BSON(CollectionType::fullNs(collNs)), + BSONObj(), + 1); + if (!statusFind.isOK()) { + return statusFind.getStatus(); + } + + const auto& retOpTimePair = statusFind.getValue(); + const auto& retVal = retOpTimePair.value; + if (retVal.empty()) { + return Status(ErrorCodes::NamespaceNotFound, + stream() << "collection " << collNs << " not found"); + } + + invariant(retVal.size() == 1); + + auto parseStatus = CollectionType::fromBSON(retVal.front()); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + return repl::OpTimeWith<CollectionType>(parseStatus.getValue(), retOpTimePair.opTime); +} + +Status ShardingCatalogClientImpl::getCollections(OperationContext* txn, + const std::string* dbName, + std::vector<CollectionType>* collections, + OpTime* opTime) { + BSONObjBuilder b; + if (dbName) { + invariant(!dbName->empty()); + b.appendRegex(CollectionType::fullNs(), + string(str::stream() << "^" << pcrecpp::RE::QuoteMeta(*dbName) << "\\.")); + } + + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(CollectionType::ConfigNS), + b.obj(), + BSONObj(), + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& docsOpTimePair = findStatus.getValue(); + + for (const BSONObj& obj : docsOpTimePair.value) { + const auto collectionResult = CollectionType::fromBSON(obj); + if (!collectionResult.isOK()) { + collections->clear(); + return {ErrorCodes::FailedToParse, + str::stream() << "error while parsing " << CollectionType::ConfigNS + << " document: " + << obj + << " : " + << collectionResult.getStatus().toString()}; + } + + collections->push_back(collectionResult.getValue()); + } + + if (opTime) { + *opTime = docsOpTimePair.opTime; + } + + return Status::OK(); +} + +Status ShardingCatalogClientImpl::dropCollection(OperationContext* txn, const NamespaceString& ns) { + logChange(txn, + "dropCollection.start", + ns.ns(), + BSONObj(), + ShardingCatalogClientImpl::kMajorityWriteConcern); + + auto shardsStatus = getAllShards(txn, repl::ReadConcernLevel::kMajorityReadConcern); + if (!shardsStatus.isOK()) { + return shardsStatus.getStatus(); + } + vector<ShardType> allShards = std::move(shardsStatus.getValue().value); + + LOG(1) << "dropCollection " << ns << " started"; + + // Lock the collection globally so that split/migrate cannot run + Seconds waitFor(DistLockManager::kDefaultLockTimeout); + MONGO_FAIL_POINT_BLOCK(setDropCollDistLockWait, customWait) { + const BSONObj& data = customWait.getData(); + waitFor = Seconds(data["waitForSecs"].numberInt()); + } + + auto scopedDistLock = getDistLockManager()->lock(txn, ns.ns(), "drop", waitFor); + if (!scopedDistLock.isOK()) { + return scopedDistLock.getStatus(); + } + + LOG(1) << "dropCollection " << ns << " locked"; + + std::map<string, BSONObj> errors; + auto* shardRegistry = grid.shardRegistry(); + + for (const auto& shardEntry : allShards) { + auto shardStatus = shardRegistry->getShard(txn, shardEntry.getName()); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); + } + auto dropResult = shardStatus.getValue()->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + ns.db().toString(), + BSON("drop" << ns.coll() << WriteConcernOptions::kWriteConcernField + << txn->getWriteConcern().toBSON()), + Shard::RetryPolicy::kIdempotent); + + if (!dropResult.isOK()) { + return Status(dropResult.getStatus().code(), + dropResult.getStatus().reason() + " at " + shardEntry.getName()); + } + + auto dropStatus = std::move(dropResult.getValue().commandStatus); + auto wcStatus = std::move(dropResult.getValue().writeConcernStatus); + if (!dropStatus.isOK() || !wcStatus.isOK()) { + if (dropStatus.code() == ErrorCodes::NamespaceNotFound && wcStatus.isOK()) { + // Generally getting NamespaceNotFound is okay to ignore as it simply means that + // the collection has already been dropped or doesn't exist on this shard. + // If, however, we get NamespaceNotFound but also have a write concern error then we + // can't confirm whether the fact that the namespace doesn't exist is actually + // committed. Thus we must still fail on NamespaceNotFound if there is also a write + // concern error. This can happen if we call drop, it succeeds but with a write + // concern error, then we retry the drop. + continue; + } + + errors.emplace(shardEntry.getHost(), std::move(dropResult.getValue().response)); + } + } + + if (!errors.empty()) { + StringBuilder sb; + sb << "Dropping collection failed on the following hosts: "; + + for (auto it = errors.cbegin(); it != errors.cend(); ++it) { + if (it != errors.cbegin()) { + sb << ", "; + } + + sb << it->first << ": " << it->second; + } + + return {ErrorCodes::OperationFailed, sb.str()}; + } + + LOG(1) << "dropCollection " << ns << " shard data deleted"; + + // Remove chunk data + Status result = removeConfigDocuments(txn, + ChunkType::ConfigNS, + BSON(ChunkType::ns(ns.ns())), + ShardingCatalogClient::kMajorityWriteConcern); + if (!result.isOK()) { + return result; + } + + LOG(1) << "dropCollection " << ns << " chunk data deleted"; + + // Mark the collection as dropped + CollectionType coll; + coll.setNs(ns); + coll.setDropped(true); + coll.setEpoch(ChunkVersion::DROPPED().epoch()); + coll.setUpdatedAt(Grid::get(txn)->getNetwork()->now()); + + result = updateCollection(txn, ns.ns(), coll); + if (!result.isOK()) { + return result; + } + + LOG(1) << "dropCollection " << ns << " collection marked as dropped"; + + for (const auto& shardEntry : allShards) { + SetShardVersionRequest ssv = SetShardVersionRequest::makeForVersioningNoPersist( + grid.shardRegistry()->getConfigServerConnectionString(), + shardEntry.getName(), + fassertStatusOK(28781, ConnectionString::parse(shardEntry.getHost())), + ns, + ChunkVersion::DROPPED(), + true); + + auto shardStatus = shardRegistry->getShard(txn, shardEntry.getName()); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); + } + auto shard = shardStatus.getValue(); + + auto ssvResult = shard->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + ssv.toBSON(), + Shard::RetryPolicy::kIdempotent); + + if (!ssvResult.isOK()) { + return ssvResult.getStatus(); + } + + auto ssvStatus = std::move(ssvResult.getValue().commandStatus); + if (!ssvStatus.isOK()) { + return ssvStatus; + } + + auto unsetShardingStatus = shard->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + BSON("unsetSharding" << 1), + Shard::RetryPolicy::kIdempotent); + + if (!unsetShardingStatus.isOK()) { + return unsetShardingStatus.getStatus(); + } + + auto unsetShardingResult = std::move(unsetShardingStatus.getValue().commandStatus); + if (!unsetShardingResult.isOK()) { + return unsetShardingResult; + } + } + + LOG(1) << "dropCollection " << ns << " completed"; + + logChange(txn, + "dropCollection", + ns.ns(), + BSONObj(), + ShardingCatalogClientImpl::kMajorityWriteConcern); + + return Status::OK(); +} + +StatusWith<BSONObj> ShardingCatalogClientImpl::getGlobalSettings(OperationContext* txn, + StringData key) { + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + kSettingsNamespace, + BSON("_id" << key), + BSONObj(), + 1); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& docs = findStatus.getValue().value; + if (docs.empty()) { + return {ErrorCodes::NoMatchingDocument, + str::stream() << "can't find settings document with key: " << key}; + } + + invariant(docs.size() == 1); + return docs.front(); +} + +StatusWith<VersionType> ShardingCatalogClientImpl::getConfigVersion( + OperationContext* txn, repl::ReadConcernLevel readConcern) { + auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + kConfigReadSelector, + readConcern, + NamespaceString(VersionType::ConfigNS), + BSONObj(), + BSONObj(), + boost::none /* no limit */); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + auto queryResults = findStatus.getValue().docs; + + if (queryResults.size() > 1) { + return {ErrorCodes::TooManyMatchingDocuments, + str::stream() << "should only have 1 document in " << VersionType::ConfigNS}; + } + + if (queryResults.empty()) { + VersionType versionInfo; + versionInfo.setMinCompatibleVersion(UpgradeHistory_EmptyVersion); + versionInfo.setCurrentVersion(UpgradeHistory_EmptyVersion); + versionInfo.setClusterId(OID{}); + return versionInfo; + } + + BSONObj versionDoc = queryResults.front(); + auto versionTypeResult = VersionType::fromBSON(versionDoc); + if (!versionTypeResult.isOK()) { + return Status(ErrorCodes::UnsupportedFormat, + str::stream() << "invalid config.version document: " << versionDoc + << causedBy(versionTypeResult.getStatus())); + } + + auto validationStatus = versionTypeResult.getValue().validate(); + if (!validationStatus.isOK()) { + return Status(validationStatus.code(), + str::stream() << "invalid config.version document: " << versionDoc + << causedBy(validationStatus.reason())); + } + + return versionTypeResult.getValue(); +} + +Status ShardingCatalogClientImpl::getDatabasesForShard(OperationContext* txn, + const ShardId& shardId, + vector<string>* dbs) { + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(DatabaseType::ConfigNS), + BSON(DatabaseType::primary(shardId.toString())), + BSONObj(), + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + for (const BSONObj& obj : findStatus.getValue().value) { + string dbName; + Status status = bsonExtractStringField(obj, DatabaseType::name(), &dbName); + if (!status.isOK()) { + dbs->clear(); + return status; + } + + dbs->push_back(dbName); + } + + return Status::OK(); +} + +Status ShardingCatalogClientImpl::getChunks(OperationContext* txn, + const BSONObj& query, + const BSONObj& sort, + boost::optional<int> limit, + vector<ChunkType>* chunks, + OpTime* opTime, + repl::ReadConcernLevel readConcern) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + readConcern == repl::ReadConcernLevel::kMajorityReadConcern); + chunks->clear(); + + // Convert boost::optional<int> to boost::optional<long long>. + auto longLimit = limit ? boost::optional<long long>(*limit) : boost::none; + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + readConcern, + NamespaceString(ChunkType::ConfigNS), + query, + sort, + longLimit); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto chunkDocsOpTimePair = findStatus.getValue(); + for (const BSONObj& obj : chunkDocsOpTimePair.value) { + auto chunkRes = ChunkType::fromBSON(obj); + if (!chunkRes.isOK()) { + chunks->clear(); + return {ErrorCodes::FailedToParse, + stream() << "Failed to parse chunk with id (" + << obj[ChunkType::name()].toString() + << "): " + << chunkRes.getStatus().toString()}; + } + + chunks->push_back(chunkRes.getValue()); + } + + if (opTime) { + *opTime = chunkDocsOpTimePair.opTime; + } + + return Status::OK(); +} + +Status ShardingCatalogClientImpl::getTagsForCollection(OperationContext* txn, + const std::string& collectionNs, + std::vector<TagsType>* tags) { + tags->clear(); + + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(TagsType::ConfigNS), + BSON(TagsType::ns(collectionNs)), + BSON(TagsType::min() << 1), + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + for (const BSONObj& obj : findStatus.getValue().value) { + auto tagRes = TagsType::fromBSON(obj); + if (!tagRes.isOK()) { + tags->clear(); + return Status(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse tag: " + << tagRes.getStatus().toString()); + } + + tags->push_back(tagRes.getValue()); + } + + return Status::OK(); +} + +StatusWith<string> ShardingCatalogClientImpl::getTagForChunk(OperationContext* txn, + const std::string& collectionNs, + const ChunkType& chunk) { + BSONObj query = + BSON(TagsType::ns(collectionNs) << TagsType::min() << BSON("$lte" << chunk.getMin()) + << TagsType::max() + << BSON("$gte" << chunk.getMax())); + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(TagsType::ConfigNS), + query, + BSONObj(), + 1); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& docs = findStatus.getValue().value; + if (docs.empty()) { + return string{}; + } + + invariant(docs.size() == 1); + BSONObj tagsDoc = docs.front(); + + const auto tagsResult = TagsType::fromBSON(tagsDoc); + if (!tagsResult.isOK()) { + return {ErrorCodes::FailedToParse, + stream() << "error while parsing " << TagsType::ConfigNS << " document: " << tagsDoc + << " : " + << tagsResult.getStatus().toString()}; + } + return tagsResult.getValue().getTag(); +} + +StatusWith<repl::OpTimeWith<std::vector<ShardType>>> ShardingCatalogClientImpl::getAllShards( + OperationContext* txn, repl::ReadConcernLevel readConcern) { + std::vector<ShardType> shards; + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + readConcern, + NamespaceString(ShardType::ConfigNS), + BSONObj(), // no query filter + BSONObj(), // no sort + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + for (const BSONObj& doc : findStatus.getValue().value) { + auto shardRes = ShardType::fromBSON(doc); + if (!shardRes.isOK()) { + return {ErrorCodes::FailedToParse, + stream() << "Failed to parse shard " << causedBy(shardRes.getStatus()) << doc}; + } + + Status validateStatus = shardRes.getValue().validate(); + if (!validateStatus.isOK()) { + return {validateStatus.code(), + stream() << "Failed to validate shard " << causedBy(validateStatus) << doc}; + } + + shards.push_back(shardRes.getValue()); + } + + return repl::OpTimeWith<std::vector<ShardType>>{std::move(shards), + findStatus.getValue().opTime}; +} + +bool ShardingCatalogClientImpl::runUserManagementWriteCommand(OperationContext* txn, + const std::string& commandName, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + BSONObj cmdToRun = cmdObj; + { + // Make sure that if the command has a write concern that it is w:1 or w:majority, and + // convert w:1 or no write concern to w:majority before sending. + WriteConcernOptions writeConcern; + writeConcern.reset(); + + BSONElement writeConcernElement = cmdObj[WriteConcernOptions::kWriteConcernField]; + bool initialCmdHadWriteConcern = !writeConcernElement.eoo(); + if (initialCmdHadWriteConcern) { + Status status = writeConcern.parse(writeConcernElement.Obj()); + if (!status.isOK()) { + return Command::appendCommandStatus(*result, status); + } + + if (!(writeConcern.wNumNodes == 1 || + writeConcern.wMode == WriteConcernOptions::kMajority)) { + return Command::appendCommandStatus( + *result, + {ErrorCodes::InvalidOptions, + str::stream() << "Invalid replication write concern. User management write " + "commands may only use w:1 or w:'majority', got: " + << writeConcern.toBSON()}); + } + } + + writeConcern.wMode = WriteConcernOptions::kMajority; + writeConcern.wNumNodes = 0; + + BSONObjBuilder modifiedCmd; + if (!initialCmdHadWriteConcern) { + modifiedCmd.appendElements(cmdObj); + } else { + BSONObjIterator cmdObjIter(cmdObj); + while (cmdObjIter.more()) { + BSONElement e = cmdObjIter.next(); + if (WriteConcernOptions::kWriteConcernField == e.fieldName()) { + continue; + } + modifiedCmd.append(e); + } + } + modifiedCmd.append(WriteConcernOptions::kWriteConcernField, writeConcern.toBSON()); + cmdToRun = modifiedCmd.obj(); + } + + auto response = + Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + dbname, + cmdToRun, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kNotIdempotent); + + if (!response.isOK()) { + return Command::appendCommandStatus(*result, response.getStatus()); + } + if (!response.getValue().commandStatus.isOK()) { + return Command::appendCommandStatus(*result, response.getValue().commandStatus); + } + if (!response.getValue().writeConcernStatus.isOK()) { + return Command::appendCommandStatus(*result, response.getValue().writeConcernStatus); + } + + result->appendElements(response.getValue().response); + return true; +} + +bool ShardingCatalogClientImpl::runReadCommandForTest(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + BSONObjBuilder cmdBuilder; + cmdBuilder.appendElements(cmdObj); + _appendReadConcern(&cmdBuilder); + + auto resultStatus = + Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + txn, kConfigReadSelector, dbname, cmdBuilder.done(), Shard::RetryPolicy::kIdempotent); + if (resultStatus.isOK()) { + result->appendElements(resultStatus.getValue().response); + return resultStatus.getValue().commandStatus.isOK(); + } + + return Command::appendCommandStatus(*result, resultStatus.getStatus()); +} + +bool ShardingCatalogClientImpl::runUserManagementReadCommand(OperationContext* txn, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + auto resultStatus = + Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + txn, + kConfigPrimaryPreferredSelector, + dbname, + cmdObj, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + if (resultStatus.isOK()) { + result->appendElements(resultStatus.getValue().response); + return resultStatus.getValue().commandStatus.isOK(); + } + + return Command::appendCommandStatus(*result, resultStatus.getStatus()); +} + +Status ShardingCatalogClientImpl::applyChunkOpsDeprecated(OperationContext* txn, + const BSONArray& updateOps, + const BSONArray& preCondition, + const std::string& nss, + const ChunkVersion& lastChunkVersion, + const WriteConcernOptions& writeConcern, + repl::ReadConcernLevel readConcern) { + invariant(serverGlobalParams.clusterRole == ClusterRole::ConfigServer || + (readConcern == repl::ReadConcernLevel::kMajorityReadConcern && + writeConcern.wMode == WriteConcernOptions::kMajority)); + BSONObj cmd = BSON("applyOps" << updateOps << "preCondition" << preCondition + << WriteConcernOptions::kWriteConcernField + << writeConcern.toBSON()); + + auto response = + Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "config", + cmd, + Shard::RetryPolicy::kIdempotent); + + if (!response.isOK()) { + return response.getStatus(); + } + + Status status = response.getValue().commandStatus.isOK() + ? std::move(response.getValue().writeConcernStatus) + : std::move(response.getValue().commandStatus); + + // TODO (Dianna) This fail point needs to be reexamined when CommitChunkMigration is in: + // migrations will no longer be able to exercise it, so split or merge will need to do so. + // SERVER-22659. + if (MONGO_FAIL_POINT(failApplyChunkOps)) { + status = Status(ErrorCodes::InternalError, "Failpoint 'failApplyChunkOps' generated error"); + } + + if (!status.isOK()) { + string errMsg; + + // This could be a blip in the network connectivity. Check if the commit request made it. + // + // If all the updates were successfully written to the chunks collection, the last + // document in the list of updates should be returned from a query to the chunks + // collection. The last chunk can be identified by namespace and version number. + + warning() << "chunk operation commit failed and metadata will be revalidated" + << causedBy(redact(status)); + + // Look for the chunk in this shard whose version got bumped. We assume that if that + // mod made it to the config server, then applyOps was successful. + std::vector<ChunkType> newestChunk; + BSONObjBuilder query; + lastChunkVersion.addToBSON(query, ChunkType::DEPRECATED_lastmod()); + query.append(ChunkType::ns(), nss); + Status chunkStatus = + getChunks(txn, query.obj(), BSONObj(), 1, &newestChunk, nullptr, readConcern); + + if (!chunkStatus.isOK()) { + warning() << "getChunks function failed, unable to validate chunk operation metadata" + << causedBy(redact(chunkStatus)); + errMsg = str::stream() + << "getChunks function failed, unable to validate chunk " + << "operation metadata: " << causedBy(redact(chunkStatus)) + << ". applyChunkOpsDeprecated failed to get confirmation " + << "of commit. Unable to save chunk ops. Command: " << redact(cmd) + << ". Result: " << redact(response.getValue().response); + } else if (!newestChunk.empty()) { + invariant(newestChunk.size() == 1); + log() << "chunk operation commit confirmed"; + return Status::OK(); + } else { + errMsg = str::stream() << "chunk operation commit failed: version " + << lastChunkVersion.toString() << " doesn't exist in namespace" + << nss << ". Unable to save chunk ops. Command: " << cmd + << ". Result: " << response.getValue().response; + } + return Status(status.code(), errMsg); + } + + return Status::OK(); +} + +DistLockManager* ShardingCatalogClientImpl::getDistLockManager() { + invariant(_distLockManager); + return _distLockManager.get(); +} + +void ShardingCatalogClientImpl::writeConfigServerDirect(OperationContext* txn, + const BatchedCommandRequest& batchRequest, + BatchedCommandResponse* batchResponse) { + // We only support batch sizes of one for config writes + if (batchRequest.sizeWriteOps() != 1) { + toBatchError(Status(ErrorCodes::InvalidOptions, + str::stream() << "Writes to config servers must have batch size of 1, " + << "found " + << batchRequest.sizeWriteOps()), + batchResponse); + return; + } + + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + *batchResponse = configShard->runBatchWriteCommandOnConfig( + txn, batchRequest, Shard::RetryPolicy::kNotIdempotent); +} + +Status ShardingCatalogClientImpl::insertConfigDocument(OperationContext* txn, + const std::string& ns, + const BSONObj& doc, + const WriteConcernOptions& writeConcern) { + const NamespaceString nss(ns); + invariant(nss.db() == "config"); + + const BSONElement idField = doc.getField("_id"); + invariant(!idField.eoo()); + + auto insert(stdx::make_unique<BatchedInsertRequest>()); + insert->addToDocuments(doc); + + BatchedCommandRequest request(insert.release()); + request.setNS(nss); + request.setWriteConcern(writeConcern.toBSON()); + + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + for (int retry = 1; retry <= kMaxWriteRetry; retry++) { + auto response = + configShard->runBatchWriteCommandOnConfig(txn, request, Shard::RetryPolicy::kNoRetry); + + Status status = response.toStatus(); + + if (retry < kMaxWriteRetry && + configShard->isRetriableError(status.code(), Shard::RetryPolicy::kIdempotent)) { + // Pretend like the operation is idempotent because we're handling DuplicateKey errors + // specially + continue; + } + + // If we get DuplicateKey error on the first attempt to insert, this definitively means that + // we are trying to insert the same entry a second time, so error out. If it happens on a + // retry attempt though, it is not clear whether we are actually inserting a duplicate key + // or it is because we failed to wait for write concern on the first attempt. In order to + // differentiate, fetch the entry and check. + if (retry > 1 && status == ErrorCodes::DuplicateKey) { + LOG(1) << "Insert retry failed because of duplicate key error, rechecking."; + + auto fetchDuplicate = + _exhaustiveFindOnConfig(txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + nss, + idField.wrap(), + BSONObj(), + boost::none); + if (!fetchDuplicate.isOK()) { + return fetchDuplicate.getStatus(); + } + + auto existingDocs = fetchDuplicate.getValue().value; + if (existingDocs.empty()) { + return Status(ErrorCodes::DuplicateKey, + stream() << "DuplicateKey error" << causedBy(status) + << " was returned after a retry attempt, but no documents " + "were found. This means a concurrent change occurred " + "together with the retries."); + } + + invariant(existingDocs.size() == 1); + + BSONObj existing = std::move(existingDocs.front()); + if (existing.woCompare(doc) == 0) { + // Documents match, so treat the operation as success + return Status::OK(); + } + } + + return status; + } + + MONGO_UNREACHABLE; +} + +StatusWith<bool> ShardingCatalogClientImpl::updateConfigDocument( + OperationContext* txn, + const string& ns, + const BSONObj& query, + const BSONObj& update, + bool upsert, + const WriteConcernOptions& writeConcern) { + const NamespaceString nss(ns); + invariant(nss.db() == "config"); + + const BSONElement idField = query.getField("_id"); + invariant(!idField.eoo()); + + unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); + updateDoc->setQuery(query); + updateDoc->setUpdateExpr(update); + updateDoc->setUpsert(upsert); + updateDoc->setMulti(false); + + unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest()); + updateRequest->addToUpdates(updateDoc.release()); + + BatchedCommandRequest request(updateRequest.release()); + request.setNS(nss); + request.setWriteConcern(writeConcern.toBSON()); + + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + auto response = + configShard->runBatchWriteCommandOnConfig(txn, request, Shard::RetryPolicy::kIdempotent); + + Status status = response.toStatus(); + if (!status.isOK()) { + return status; + } + + const auto nSelected = response.getN(); + invariant(nSelected == 0 || nSelected == 1); + return (nSelected == 1); +} + +Status ShardingCatalogClientImpl::removeConfigDocuments(OperationContext* txn, + const string& ns, + const BSONObj& query, + const WriteConcernOptions& writeConcern) { + const NamespaceString nss(ns); + invariant(nss.db() == "config"); + + auto deleteDoc(stdx::make_unique<BatchedDeleteDocument>()); + deleteDoc->setQuery(query); + deleteDoc->setLimit(0); + + auto deleteRequest(stdx::make_unique<BatchedDeleteRequest>()); + deleteRequest->addToDeletes(deleteDoc.release()); + + BatchedCommandRequest request(deleteRequest.release()); + request.setNS(nss); + request.setWriteConcern(writeConcern.toBSON()); + + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + auto response = + configShard->runBatchWriteCommandOnConfig(txn, request, Shard::RetryPolicy::kIdempotent); + + return response.toStatus(); +} + +Status ShardingCatalogClientImpl::_checkDbDoesNotExist(OperationContext* txn, + const string& dbName, + DatabaseType* db) { + BSONObjBuilder queryBuilder; + queryBuilder.appendRegex( + DatabaseType::name(), (string) "^" + pcrecpp::RE::QuoteMeta(dbName) + "$", "i"); + + auto findStatus = _exhaustiveFindOnConfig(txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(DatabaseType::ConfigNS), + queryBuilder.obj(), + BSONObj(), + 1); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& docs = findStatus.getValue().value; + if (docs.empty()) { + return Status::OK(); + } + + BSONObj dbObj = docs.front(); + std::string actualDbName = dbObj[DatabaseType::name()].String(); + if (actualDbName == dbName) { + if (db) { + auto parseDBStatus = DatabaseType::fromBSON(dbObj); + if (!parseDBStatus.isOK()) { + return parseDBStatus.getStatus(); + } + + *db = parseDBStatus.getValue(); + } + + return Status(ErrorCodes::NamespaceExists, + str::stream() << "database " << dbName << " already exists"); + } + + return Status(ErrorCodes::DatabaseDifferCase, + str::stream() << "can't have 2 databases that just differ on case " + << " have: " + << actualDbName + << " want to add: " + << dbName); +} + +Status ShardingCatalogClientImpl::_createCappedConfigCollection( + OperationContext* txn, + StringData collName, + int cappedSize, + const WriteConcernOptions& writeConcern) { + BSONObj createCmd = BSON("create" << collName << "capped" << true << "size" << cappedSize + << WriteConcernOptions::kWriteConcernField + << writeConcern.toBSON()); + + auto result = + Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "config", + createCmd, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + if (!result.isOK()) { + return result.getStatus(); + } + + if (!result.getValue().commandStatus.isOK()) { + if (result.getValue().commandStatus == ErrorCodes::NamespaceExists) { + if (result.getValue().writeConcernStatus.isOK()) { + return Status::OK(); + } else { + return result.getValue().writeConcernStatus; + } + } else { + return result.getValue().commandStatus; + } + } + + return result.getValue().writeConcernStatus; +} + +StatusWith<long long> ShardingCatalogClientImpl::_runCountCommandOnConfig(OperationContext* txn, + const NamespaceString& ns, + BSONObj query) { + BSONObjBuilder countBuilder; + countBuilder.append("count", ns.coll()); + countBuilder.append("query", query); + _appendReadConcern(&countBuilder); + + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + auto resultStatus = + configShard->runCommandWithFixedRetryAttempts(txn, + kConfigReadSelector, + ns.db().toString(), + countBuilder.done(), + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + if (!resultStatus.isOK()) { + return resultStatus.getStatus(); + } + if (!resultStatus.getValue().commandStatus.isOK()) { + return resultStatus.getValue().commandStatus; + } + + auto responseObj = std::move(resultStatus.getValue().response); + + long long result; + auto status = bsonExtractIntegerField(responseObj, "n", &result); + if (!status.isOK()) { + return status; + } + + return result; +} + +StatusWith<repl::OpTimeWith<vector<BSONObj>>> ShardingCatalogClientImpl::_exhaustiveFindOnConfig( + OperationContext* txn, + const ReadPreferenceSetting& readPref, + repl::ReadConcernLevel readConcern, + const NamespaceString& nss, + const BSONObj& query, + const BSONObj& sort, + boost::optional<long long> limit) { + auto response = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, readPref, readConcern, nss, query, sort, limit); + if (!response.isOK()) { + return response.getStatus(); + } + + return repl::OpTimeWith<vector<BSONObj>>(std::move(response.getValue().docs), + response.getValue().opTime); +} + +void ShardingCatalogClientImpl::_appendReadConcern(BSONObjBuilder* builder) { + repl::ReadConcernArgs readConcern(grid.configOpTime(), + repl::ReadConcernLevel::kMajorityReadConcern); + readConcern.appendInfo(builder); +} + +Status ShardingCatalogClientImpl::appendInfoForConfigServerDatabases(OperationContext* txn, + BSONArrayBuilder* builder) { + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + auto resultStatus = + configShard->runCommandWithFixedRetryAttempts(txn, + kConfigPrimaryPreferredSelector, + "admin", + BSON("listDatabases" << 1), + Shard::RetryPolicy::kIdempotent); + + if (!resultStatus.isOK()) { + return resultStatus.getStatus(); + } + if (!resultStatus.getValue().commandStatus.isOK()) { + return resultStatus.getValue().commandStatus; + } + + auto listDBResponse = std::move(resultStatus.getValue().response); + BSONElement dbListArray; + auto dbListStatus = bsonExtractTypedField(listDBResponse, "databases", Array, &dbListArray); + if (!dbListStatus.isOK()) { + return dbListStatus; + } + + BSONObjIterator iter(dbListArray.Obj()); + + while (iter.more()) { + auto dbEntry = iter.next().Obj(); + string name; + auto parseStatus = bsonExtractStringField(dbEntry, "name", &name); + + if (!parseStatus.isOK()) { + return parseStatus; + } + + if (name == "config" || name == "admin") { + builder->append(dbEntry); + } + } + + return Status::OK(); +} + +} // namespace mongo |