/** * Copyright (C) 2016 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/s/chunk_move_write_concern_options.h" #include "mongo/db/s/sharding_state.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/commit_chunk_migration_request_type.h" namespace mongo { namespace { /** * This command takes the chunk being migrated ("migratedChunk") and generates a new version for it * that is written along with its new shard location ("toShard") to the chunks collection. It also * takes a control chunk ("controlChunk") and assigns it a new version as well so that the source * ("fromShard") shard's shardVersion will increases. If there is no control chunk, then the chunk * being migrated is the source shard's only remaining chunk. * * The new chunk version is generated by querying the highest chunk version of the collection, and * then incrementing that major value for both migrated and control chunks and setting the minor to * 0 for the migrated chunk and 1 for the control chunk. A global exclusive lock is held for the * duration of generating the new chunk version and writing to the chunks collection so that * yielding cannot occur. This assures that generated ChunkVersions are strictly monotonically * increasing -- a second process will not be able to query for max chunk version until the first * finishes writing the new highest chunk version it generated. * * Command Format: * { * _configsvrCommitChunkMigration: ., * migratedChunk: {min: , max: }, * controlChunk: {min: , max: }, (optional) * fromShard: "", * toShard: "", * } * * Returns: * { * migratedChunkVersion: , * controlChunkVersion: , (only present if a controlChunk is defined) * } * */ class ConfigSvrCommitChunkMigrationCommand : public Command { public: ConfigSvrCommitChunkMigrationCommand() : Command("_configsvrCommitChunkMigration") {} void help(std::stringstream& help) const override { help << "should not be calling this directly"; } bool slaveOk() const override { return false; } bool adminOnly() const override { return true; } virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } Status checkAuthForCommand(Client* client, const std::string& dbname, const BSONObj& cmdObj) override { if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( ResourcePattern::forClusterResource(), ActionType::internal)) { return Status(ErrorCodes::Unauthorized, "Unauthorized"); } return Status::OK(); } std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { return parseNsFullyQualified(dbname, cmdObj); } static void checkChunkIsOnShard(OperationContext* txn, const NamespaceString& nss, const BSONObj& min, const BSONObj& max, const ShardId& shard) { BSONObj chunkQuery = BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max << ChunkType::shard() << shard); // Must use kLocalReadConcern because using majority will set a flag on the recovery unit // that conflicts with the subsequent writes in the CommitChunkMigration command. auto findResponse = uassertStatusOK( Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(ChunkType::ConfigNS), chunkQuery, BSONObj(), 1)); uassert(40165, str::stream() << "Could not find the chunk (" << chunkQuery.toString() << ") on the shard. Cannot execute the migration commit with invalid chunks.", !findResponse.docs.empty()); } BSONObj makeCommitChunkApplyOpsCommand( const NamespaceString& nss, const ChunkRange& migratedChunkRange, const boost::optional& controlChunkRange, const ChunkVersion newMigratedChunkVersion, const boost::optional newControlChunkVersion, StringData toShard, StringData fromShard) { // Update migratedChunk's version and shard. BSONArrayBuilder updates; { BSONObjBuilder op; op.append("op", "u"); op.appendBool("b", false); // No upserting op.append("ns", ChunkType::ConfigNS); BSONObjBuilder n(op.subobjStart("o")); n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunkRange.getMin())); newMigratedChunkVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); n.append(ChunkType::ns(), nss.ns()); n.append(ChunkType::min(), migratedChunkRange.getMin()); n.append(ChunkType::max(), migratedChunkRange.getMax()); n.append(ChunkType::shard(), toShard); n.done(); BSONObjBuilder q(op.subobjStart("o2")); q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunkRange.getMin())); q.done(); updates.append(op.obj()); } // If we have a controlChunk, update its chunk version. if (controlChunkRange) { BSONObjBuilder op; op.append("op", "u"); op.appendBool("b", false); op.append("ns", ChunkType::ConfigNS); BSONObjBuilder n(op.subobjStart("o")); n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunkRange->getMin())); newControlChunkVersion->addToBSON(n, ChunkType::DEPRECATED_lastmod()); n.append(ChunkType::ns(), nss.ns()); n.append(ChunkType::min(), controlChunkRange->getMin()); n.append(ChunkType::max(), controlChunkRange->getMax()); n.append(ChunkType::shard(), fromShard); n.done(); BSONObjBuilder q(op.subobjStart("o2")); q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunkRange->getMin())); q.done(); updates.append(op.obj()); } // Do not give applyOps a write concern. If applyOps tries to wait for replication, it will // fail because of the GlobalWrite lock CommitChunkMigration already holds. Replication will // not be able to take the lock it requires. return BSON("applyOps" << updates.arr()); } bool run(OperationContext* txn, const std::string& dbName, BSONObj& cmdObj, int options, std::string& errmsg, BSONObjBuilder& result) override { const NamespaceString nss = NamespaceString(parseNs(dbName, cmdObj)); CommitChunkMigrationRequest commitChunkMigrationRequest = uassertStatusOK(CommitChunkMigrationRequest::createFromCommand(nss, cmdObj)); // Run operations under a nested lock as a hack to prevent yielding. When query/applyOps // commands are called, they will take a second lock, and the PlanExecutor will be unable to // yield. // // ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions // for migrated chunks are generated within the command. Therefore it cannot be allowed to // yield between generating the ChunkVersion and committing it to the database with // applyOps. Lock::GlobalWrite firstGlobalWriteLock(txn->lockState()); // Check that migratedChunk and controlChunk are where they should be, on fromShard. checkChunkIsOnShard(txn, nss, commitChunkMigrationRequest.getMigratedChunkRange().getMin(), commitChunkMigrationRequest.getMigratedChunkRange().getMax(), commitChunkMigrationRequest.getFromShard()); if (commitChunkMigrationRequest.hasControlChunkRange()) { checkChunkIsOnShard(txn, nss, commitChunkMigrationRequest.getControlChunkRange().getMin(), commitChunkMigrationRequest.getControlChunkRange().getMax(), commitChunkMigrationRequest.getFromShard()); } // Generate the new chunk version (CV). Query the current max CV of the collection. Use the // incremented major version of the result returned. Migrating chunk's minor version will // be 0, control chunk's minor version will be 1 (if control chunk is present). // Must use kLocalReadConcern because using majority will set a flag on the recovery unit // that conflicts with the subsequent writes. auto findResponse = uassertStatusOK( Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(ChunkType::ConfigNS), BSON("ns" << nss.ns()), BSON(ChunkType::DEPRECATED_lastmod << -1), 1)); std::vector chunksVector = findResponse.docs; uassert(40164, str::stream() << "Tried to find max chunk version for collection '" << nss.ns() << ", but found no chunks", !chunksVector.empty()); ChunkVersion currentMaxVersion = ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); // Generate the new versions of migratedChunk and controlChunk. ChunkVersion newMigratedChunkVersion = ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch()); boost::optional newControlChunkVersion = boost::none; boost::optional newControlChunkRange = boost::none; if (commitChunkMigrationRequest.hasControlChunkRange()) { newControlChunkVersion = ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch()); newControlChunkRange = commitChunkMigrationRequest.getControlChunkRange(); } auto applyOpsCommandResponse = Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand( txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, nss.db().toString(), makeCommitChunkApplyOpsCommand( nss, commitChunkMigrationRequest.getMigratedChunkRange(), newControlChunkRange, newMigratedChunkVersion, newControlChunkVersion, commitChunkMigrationRequest.getToShard().toString(), commitChunkMigrationRequest.getFromShard().toString()), Shard::RetryPolicy::kIdempotent); if (!applyOpsCommandResponse.isOK()) { return appendCommandStatus(result, applyOpsCommandResponse.getStatus()); } if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) { return appendCommandStatus(result, applyOpsCommandResponse.getValue().commandStatus); } newMigratedChunkVersion.appendWithFieldForCommands(&result, "migratedChunkVersion"); if (commitChunkMigrationRequest.hasControlChunkRange()) { newControlChunkVersion->appendWithFieldForCommands(&result, "controlChunkVersion"); } return true; } } configsvrCommitChunkMigrationCommand; } // namespace } // namespace mongo