/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/client/connpool.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/server_options.h" #include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/move_primary_gen.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace { const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, Seconds(60)); /** * Internal sharding command run on config servers to change a database's primary shard. */ class ConfigSvrMovePrimaryCommand : public BasicCommand { public: ConfigSvrMovePrimaryCommand() : BasicCommand("_configsvrMovePrimary") {} AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kNever; } virtual bool adminOnly() const { return true; } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } std::string help() const override { return "Internal command, which is exported by the sharding config server. Do not call " "directly. Reassigns the primary shard of a database."; } virtual Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) const override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forClusterResource(), ActionType::internal)) { return Status(ErrorCodes::Unauthorized, "Unauthorized"); } return Status::OK(); } virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const { const auto nsElt = cmdObj.firstElement(); uassert(ErrorCodes::InvalidNamespace, "'movePrimary' must be of type String", nsElt.type() == BSONType::String); return nsElt.str(); } bool run(OperationContext* opCtx, const std::string& dbname_unused, const BSONObj& cmdObj, BSONObjBuilder& result) override { if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { return CommandHelpers::appendCommandStatus( result, Status(ErrorCodes::IllegalOperation, "_configsvrMovePrimary can only be run on config servers")); } auto movePrimaryRequest = MovePrimary::parse(IDLParserErrorContext("ConfigSvrMovePrimary"), cmdObj); const auto dbname = parseNs("", cmdObj); uassert( ErrorCodes::InvalidNamespace, str::stream() << "invalid db name specified: " << dbname, NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); if (dbname == NamespaceString::kAdminDb || dbname == NamespaceString::kConfigDb || dbname == NamespaceString::kLocalDb) { return CommandHelpers::appendCommandStatus( result, {ErrorCodes::InvalidOptions, str::stream() << "Can't move primary for " << dbname << " database"}); } uassert(ErrorCodes::InvalidOptions, str::stream() << "movePrimary must be called with majority writeConcern, got " << cmdObj, opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); const std::string to = movePrimaryRequest.getTo().toString(); if (to.empty()) { return CommandHelpers::appendCommandStatus( result, {ErrorCodes::InvalidOptions, str::stream() << "you have to specify where you want to move it"}); } auto const catalogClient = Grid::get(opCtx)->catalogClient(); auto const catalogCache = Grid::get(opCtx)->catalogCache(); auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); auto dbDistLock = uassertStatusOK(catalogClient->getDistLockManager()->lock( opCtx, dbname, "movePrimary", DistLockManager::kDefaultLockTimeout)); auto dbType = uassertStatusOK(catalogClient->getDatabase( opCtx, dbname, repl::ReadConcernLevel::kLocalReadConcern)) .value; const auto fromShard = uassertStatusOK(shardRegistry->getShard(opCtx, dbType.getPrimary())); const auto toShard = [&]() { auto toShardStatus = shardRegistry->getShard(opCtx, to); if (!toShardStatus.isOK()) { log() << "Could not move database '" << dbname << "' to shard '" << to << causedBy(toShardStatus.getStatus()); uassertStatusOKWithContext( toShardStatus.getStatus(), str::stream() << "Could not move database '" << dbname << "' to shard '" << to << "'"); } return toShardStatus.getValue(); }(); if (fromShard->getId() == toShard->getId()) { // We did a local read of the database entry above and found that this movePrimary // request was already satisfied. However, the data may not be majority committed (a // previous movePrimary attempt may have failed with a write concern error). // Since the current Client doesn't know the opTime of the last write to the database // entry, make it wait for the last opTime in the system when we wait for writeConcern. repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); result << "primary" << toShard->toString(); return true; } // FCV 4.0 logic exists inside the if statement. if (serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40 || serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40) { const NamespaceString nss(dbname); ShardMovePrimary shardMovePrimaryRequest; shardMovePrimaryRequest.set_movePrimary(nss); shardMovePrimaryRequest.setTo(toShard->getId().toString()); auto cmdResponse = uassertStatusOK(fromShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), "admin", CommandHelpers::appendMajorityWriteConcern(CommandHelpers::appendPassthroughFields( cmdObj, shardMovePrimaryRequest.toBSON())), Shard::RetryPolicy::kIdempotent)); CommandHelpers::filterCommandReplyForPassthrough(cmdResponse.response, &result); return true; } // The rest of this function will only be executed under FCV 3.6 (or downgrading). log() << "Moving " << dbname << " primary from: " << fromShard->toString() << " to: " << toShard->toString(); const auto shardedColls = catalogClient->getAllShardedCollectionsForDb( opCtx, dbname, repl::ReadConcernLevel::kLocalReadConcern); // Record start in changelog uassertStatusOK(catalogClient->logChange( opCtx, "movePrimary.start", dbname, _buildMoveLogEntry(dbname, fromShard->toString(), toShard->toString(), shardedColls), ShardingCatalogClient::kMajorityWriteConcern)); ScopedDbConnection toconn(toShard->getConnString()); ON_BLOCK_EXIT([&toconn] { toconn.done(); }); // TODO ERH - we need a clone command which replays operations from clone start to now // can just use local.oplog.$main BSONObj cloneRes; bool hasWCError = false; { BSONArrayBuilder barr; for (const auto& shardedColl : shardedColls) { barr.append(shardedColl.ns()); } const bool worked = toconn->runCommand( dbname, BSON("clone" << fromShard->getConnString().toString() << "collsToIgnore" << barr.arr() << bypassDocumentValidationCommandOption() << true << "writeConcern" << opCtx->getWriteConcern().toBSON()), cloneRes); if (!worked) { log() << "clone failed" << redact(cloneRes); return CommandHelpers::appendCommandStatus( result, {ErrorCodes::OperationFailed, str::stream() << "clone failed"}); } if (auto wcErrorElem = cloneRes["writeConcernError"]) { appendWriteConcernErrorToCmdResponse(toShard->getId(), wcErrorElem, result); hasWCError = true; } } { // Check if the FCV has been changed under us. invariant(!opCtx->lockState()->isLocked()); Lock::SharedLock lk(opCtx->lockState(), FeatureCompatibilityVersion::fcvLock); // If we are upgrading to (or are fully on) FCV 4.0, then fail. If we do not fail, we // will potentially write an unversioned database in a schema that requires versions. if (serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo40 || serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40) { uasserted(ErrorCodes::ConflictingOperationInProgress, "committing movePrimary failed due to version mismatch"); } // Update the new primary in the config server metadata. dbType.setPrimary(toShard->getId()); uassertStatusOK(catalogClient->updateDatabase(opCtx, dbname, dbType)); } // Ensure the next attempt to retrieve the database or any of its collections will do a full // reload catalogCache->purgeDatabase(dbname); const auto oldPrimary = fromShard->getConnString().toString(); ScopedDbConnection fromconn(fromShard->getConnString()); ON_BLOCK_EXIT([&fromconn] { fromconn.done(); }); if (shardedColls.empty()) { log() << "movePrimary dropping database on " << oldPrimary << ", no sharded collections in " << dbname; try { BSONObj dropDBInfo; fromconn->dropDatabase(dbname.c_str(), opCtx->getWriteConcern(), &dropDBInfo); if (!hasWCError) { if (auto wcErrorElem = dropDBInfo["writeConcernError"]) { appendWriteConcernErrorToCmdResponse( fromShard->getId(), wcErrorElem, result); hasWCError = true; } } } catch (DBException& e) { e.addContext(str::stream() << "movePrimary could not drop the database " << dbname << " on " << oldPrimary); throw; } } else if (cloneRes["clonedColls"].type() != Array) { // Legacy behavior from old mongod with sharded collections, *do not* delete // database, but inform user they can drop manually (or ignore). warning() << "movePrimary legacy mongod behavior detected. " << "User must manually remove unsharded collections in database " << dbname << " on " << oldPrimary; } else { // Sharded collections exist on the old primary, so drop only the cloned (unsharded) // collections. BSONObjIterator it(cloneRes["clonedColls"].Obj()); while (it.more()) { BSONElement el = it.next(); if (el.type() == String) { try { log() << "movePrimary dropping cloned collection " << el.String() << " on " << oldPrimary; BSONObj dropCollInfo; fromconn->dropCollection( el.String(), opCtx->getWriteConcern(), &dropCollInfo); if (!hasWCError) { if (auto wcErrorElem = dropCollInfo["writeConcernError"]) { appendWriteConcernErrorToCmdResponse( fromShard->getId(), wcErrorElem, result); hasWCError = true; } } } catch (DBException& e) { e.addContext(str::stream() << "movePrimary could not drop the cloned collection " << el.String() << " on " << oldPrimary); throw; } } } } result << "primary" << toShard->toString(); // Record finish in changelog uassertStatusOK(catalogClient->logChange( opCtx, "movePrimary", dbname, _buildMoveLogEntry(dbname, oldPrimary, toShard->toString(), shardedColls), ShardingCatalogClient::kMajorityWriteConcern)); return true; } private: static BSONObj _buildMoveLogEntry(const std::string& db, const std::string& from, const std::string& to, const std::vector& shardedColls) { BSONObjBuilder details; details.append("database", db); details.append("from", from); details.append("to", to); BSONArrayBuilder collB(details.subarrayStart("shardedCollections")); for (const auto& shardedColl : shardedColls) { collB.append(shardedColl.ns()); } collB.done(); return details.obj(); } } configsvrMovePrimaryCmd; } // namespace } // namespace mongo