/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/catalog/replset/catalog_manager_replica_set.h" #include #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/dbclientinterface.h" #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_settings.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/time_support.h" namespace mongo { using std::set; using std::string; using std::unique_ptr; using std::vector; using str::stream; namespace { const Status notYetImplemented(ErrorCodes::InternalError, "Not yet implemented"); // todo remove // Until read committed is supported always write to the primary with majoirty write and read // from the secondary. That way we ensure that reads will see a consistent data. const ReadPreferenceSetting kConfigWriteSelector(ReadPreference::PrimaryOnly, TagSet{}); const ReadPreferenceSetting kConfigReadSelector(ReadPreference::SecondaryOnly, TagSet{}); const int kNotMasterNumRetries = 3; const Milliseconds kNotMasterRetryInterval{500}; void _toBatchError(const Status& status, BatchedCommandResponse* response) { response->clear(); response->setErrCode(status.code()); response->setErrMessage(status.reason()); response->setOk(false); } } // namespace CatalogManagerReplicaSet::CatalogManagerReplicaSet() = default; CatalogManagerReplicaSet::~CatalogManagerReplicaSet() = default; Status CatalogManagerReplicaSet::init(const ConnectionString& configCS, std::unique_ptr distLockManager) { invariant(configCS.type() == ConnectionString::SET); _configServerConnectionString = configCS; _distLockManager = std::move(distLockManager); return Status::OK(); } Status CatalogManagerReplicaSet::startup(bool upgrade) { return Status::OK(); } ConnectionString CatalogManagerReplicaSet::connectionString() const { return _configServerConnectionString; } void CatalogManagerReplicaSet::shutDown() { LOG(1) << "CatalogManagerReplicaSet::shutDown() called."; { std::lock_guard lk(_mutex); _inShutdown = true; } invariant(_distLockManager); _distLockManager->shutDown(); } Status CatalogManagerReplicaSet::enableSharding(const std::string& dbName) { return notYetImplemented; } Status CatalogManagerReplicaSet::shardCollection(const string& ns, const ShardKeyPattern& fieldsAndOrder, bool unique, vector* initPoints, set* initShardsIds) { return notYetImplemented; } Status CatalogManagerReplicaSet::createDatabase(const std::string& dbName) { return notYetImplemented; } StatusWith CatalogManagerReplicaSet::addShard( const string& name, const ConnectionString& shardConnectionString, const long long maxSize) { return notYetImplemented; } StatusWith CatalogManagerReplicaSet::removeShard(OperationContext* txn, const std::string& name) { return notYetImplemented; } Status CatalogManagerReplicaSet::updateDatabase(const std::string& dbName, const DatabaseType& db) { fassert(28684, db.validate()); return notYetImplemented; } StatusWith CatalogManagerReplicaSet::getDatabase(const std::string& dbName) { invariant(nsIsDbOnly(dbName)); // The two databases that are hosted on the config server are config and admin if (dbName == "config" || dbName == "admin") { DatabaseType dbt; dbt.setName(dbName); dbt.setSharded(false); dbt.setPrimary("config"); return dbt; } const auto configShard = grid.shardRegistry()->getShard("config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); } auto findStatus = grid.shardRegistry()->exhaustiveFind( readHost.getValue(), NamespaceString(DatabaseType::ConfigNS), BSON(DatabaseType::name(dbName)), 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& docs = findStatus.getValue(); if (docs.empty()) { return {ErrorCodes::NamespaceNotFound, stream() << "database " << dbName << " not found"}; } invariant(docs.size() == 1); return DatabaseType::fromBSON(docs.front()); } Status CatalogManagerReplicaSet::updateCollection(const std::string& collNs, const CollectionType& coll) { fassert(28683, coll.validate()); BatchedCommandResponse response; Status status = update(CollectionType::ConfigNS, BSON(CollectionType::fullNs(collNs)), coll.toBSON(), true, // upsert false, // multi &response); if (!status.isOK()) { return Status(status.code(), str::stream() << "collection metadata write failed: " << response.toBSON() << "; status: " << status.toString()); } return Status::OK(); } StatusWith CatalogManagerReplicaSet::getCollection(const std::string& collNs) { auto configShard = grid.shardRegistry()->getShard("config"); auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHostStatus.isOK()) { return readHostStatus.getStatus(); } auto statusFind = grid.shardRegistry()->exhaustiveFind( readHostStatus.getValue(), NamespaceString(CollectionType::ConfigNS), BSON(CollectionType::fullNs(collNs)), 1); if (!statusFind.isOK()) { return statusFind.getStatus(); } const auto& retVal = statusFind.getValue(); if (retVal.empty()) { return Status(ErrorCodes::NamespaceNotFound, stream() << "collection " << collNs << " not found"); } invariant(retVal.size() == 1); return CollectionType::fromBSON(retVal.front()); } Status CatalogManagerReplicaSet::getCollections(const std::string* dbName, std::vector* collections) { return notYetImplemented; } Status CatalogManagerReplicaSet::dropCollection(const std::string& collectionNs) { return notYetImplemented; } void CatalogManagerReplicaSet::logAction(const ActionLogType& actionLog) { } void CatalogManagerReplicaSet::logChange(OperationContext* opCtx, const string& what, const string& ns, const BSONObj& detail) { } StatusWith CatalogManagerReplicaSet::getGlobalSettings(const string& key) { const auto configShard = grid.shardRegistry()->getShard("config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); } auto findStatus = grid.shardRegistry()->exhaustiveFind( readHost.getValue(), NamespaceString(SettingsType::ConfigNS), BSON(SettingsType::key(key)), 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& docs = findStatus.getValue(); if (docs.empty()) { return {ErrorCodes::NoMatchingDocument, str::stream() << "can't find settings document with key: " << key}; } BSONObj settingsDoc = docs.front(); StatusWith settingsResult = SettingsType::fromBSON(settingsDoc); if (!settingsResult.isOK()) { return {ErrorCodes::FailedToParse, str::stream() << "error while parsing settings document: " << settingsDoc << " : " << settingsResult.getStatus().toString()}; } const SettingsType& settings = settingsResult.getValue(); Status validationStatus = settings.validate(); if (!validationStatus.isOK()) { return validationStatus; } return settingsResult; } Status CatalogManagerReplicaSet::getDatabasesForShard(const string& shardName, vector* dbs) { return notYetImplemented; } Status CatalogManagerReplicaSet::getChunks(const Query& query, int nToReturn, vector* chunks) { auto configShard = grid.shardRegistry()->getShard("config"); auto readHostStatus = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHostStatus.isOK()) { return readHostStatus.getStatus(); } auto findStatus = grid.shardRegistry()->exhaustiveFind(readHostStatus.getValue(), NamespaceString(ChunkType::ConfigNS), query.obj, boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } for (const BSONObj& obj : findStatus.getValue()) { auto chunkRes = ChunkType::fromBSON(obj); if (!chunkRes.isOK()) { chunks->clear(); return {ErrorCodes::FailedToParse, stream() << "Failed to parse chunk with id (" << obj[ChunkType::name()].toString() << "): " << chunkRes.getStatus().reason()}; } chunks->push_back(chunkRes.getValue()); } return Status::OK(); } Status CatalogManagerReplicaSet::getTagsForCollection(const std::string& collectionNs, std::vector* tags) { return notYetImplemented; } StatusWith CatalogManagerReplicaSet::getTagForChunk(const std::string& collectionNs, const ChunkType& chunk) { return notYetImplemented; } Status CatalogManagerReplicaSet::getAllShards(vector* shards) { const auto configShard = grid.shardRegistry()->getShard("config"); const auto readHost = configShard->getTargeter()->findHost(kConfigReadSelector); if (!readHost.isOK()) { return readHost.getStatus(); } auto findStatus = grid.shardRegistry()->exhaustiveFind(readHost.getValue(), NamespaceString(ShardType::ConfigNS), BSONObj(), // no query filter boost::none); // no limit if (!findStatus.isOK()) { return findStatus.getStatus(); } for (const BSONObj& doc : findStatus.getValue()) { auto shardRes = ShardType::fromBSON(doc); if (!shardRes.isOK()) { shards->clear(); return {ErrorCodes::FailedToParse, stream() << "Failed to parse shard with id (" << doc[ShardType::name()].toString() << "): " << shardRes.getStatus().reason()}; } shards->push_back(shardRes.getValue()); } return Status::OK(); } bool CatalogManagerReplicaSet::isShardHost(const ConnectionString& connectionString) { return false; } bool CatalogManagerReplicaSet::runUserManagementWriteCommand(const std::string& commandName, const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { auto scopedDistLock = getDistLockManager()->lock("authorizationData", commandName, Seconds{5}); if (!scopedDistLock.isOK()) { return Command::appendCommandStatus(*result, scopedDistLock.getStatus()); } auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; for (int i = 0; i < kNotMasterNumRetries; ++i) { auto target = targeter->findHost(kConfigWriteSelector); if (!target.isOK()) { if (ErrorCodes::NotMaster == target.getStatus()) { notMasterStatus = target.getStatus(); sleepmillis(kNotMasterRetryInterval.count()); continue; } return Command::appendCommandStatus(*result, target.getStatus()); } auto response = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); if (!response.isOK()) { return Command::appendCommandStatus(*result, response.getStatus()); } Status commandStatus = Command::getStatusFromCommandResult(response.getValue()); if (ErrorCodes::NotMaster == commandStatus) { notMasterStatus = commandStatus; sleepmillis(kNotMasterRetryInterval.count()); continue; } result->appendElements(response.getValue()); return commandStatus.isOK(); } invariant(ErrorCodes::NotMaster == notMasterStatus); return Command::appendCommandStatus(*result, notMasterStatus); } bool CatalogManagerReplicaSet::runUserManagementReadCommand(const std::string& dbname, const BSONObj& cmdObj, BSONObjBuilder* result) { auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); auto target = targeter->findHost(kConfigReadSelector); if (!target.isOK()) { return Command::appendCommandStatus(*result, target.getStatus()); } auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), dbname, cmdObj); if (!resultStatus.isOK()) { return Command::appendCommandStatus(*result, resultStatus.getStatus()); } result->appendElements(resultStatus.getValue()); return Command::getStatusFromCommandResult(resultStatus.getValue()).isOK(); return false; } Status CatalogManagerReplicaSet::applyChunkOpsDeprecated(const BSONArray& updateOps, const BSONArray& preCondition) { return notYetImplemented; } DistLockManager* CatalogManagerReplicaSet::getDistLockManager() { invariant(_distLockManager); return _distLockManager.get(); } void CatalogManagerReplicaSet::writeConfigServerDirect( const BatchedCommandRequest& batchRequest, BatchedCommandResponse* batchResponse) { std::string dbname = batchRequest.getNSS().db().toString(); invariant (dbname == "config" || dbname == "admin"); const BSONObj cmdObj = batchRequest.toBSON(); auto targeter = grid.shardRegistry()->getShard("config")->getTargeter(); Status notMasterStatus{ErrorCodes::InternalError, "status not set"}; for (int i = 0; i < kNotMasterNumRetries; ++i) { auto target = targeter->findHost(kConfigWriteSelector); if (!target.isOK()) { if (ErrorCodes::NotMaster == target.getStatus()) { notMasterStatus = target.getStatus(); sleepmillis(kNotMasterRetryInterval.count()); continue; } _toBatchError(target.getStatus(), batchResponse); return; } auto resultStatus = grid.shardRegistry()->runCommand(target.getValue(), batchRequest.getNSS().db().toString(), batchRequest.toBSON()); if (!resultStatus.isOK()) { _toBatchError(resultStatus.getStatus(), batchResponse); return; } const BSONObj& commandResponse = resultStatus.getValue(); Status commandStatus = getStatusFromCommandResult(commandResponse); if (commandStatus == ErrorCodes::NotMaster) { notMasterStatus = commandStatus; sleepmillis(kNotMasterRetryInterval.count()); continue; } string errmsg; if (!batchResponse->parseBSON(commandResponse, &errmsg)) { _toBatchError(Status(ErrorCodes::FailedToParse, str::stream() << "Failed to parse config server response: " << errmsg), batchResponse); return; } return; // The normal case return point. } invariant(ErrorCodes::NotMaster == notMasterStatus); _toBatchError(notMasterStatus, batchResponse); } } // namespace mongo