/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/s/chunk_move_write_concern_options.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/commit_chunk_migration_request_type.h"
namespace mongo {
namespace {
/**
* This command takes the chunk being migrated ("migratedChunk") and generates a new version for it
* that is written along with its new shard location ("toShard") to the chunks collection. It also
* takes a control chunk ("controlChunk") and assigns it a new version as well so that the source
* ("fromShard") shard's shardVersion will increases. If there is no control chunk, then the chunk
* being migrated is the source shard's only remaining chunk.
*
* The new chunk version is generated by querying the highest chunk version of the collection, and
* then incrementing that major value for both migrated and control chunks and setting the minor to
* 0 for the migrated chunk and 1 for the control chunk. A global exclusive lock is held for the
* duration of generating the new chunk version and writing to the chunks collection so that
* yielding cannot occur. This assures that generated ChunkVersions are strictly monotonically
* increasing -- a second process will not be able to query for max chunk version until the first
* finishes writing the new highest chunk version it generated.
*
* Command Format:
* {
* _configsvrCommitChunkMigration: .,
* migratedChunk: {min: , max: },
* controlChunk: {min: , max: }, (optional)
* fromShard: "",
* toShard: "",
* }
*
* Returns:
* {
* migratedChunkVersion: ,
* controlChunkVersion: , (only present if a controlChunk is defined)
* }
*
*/
class ConfigSvrCommitChunkMigrationCommand : public Command {
public:
ConfigSvrCommitChunkMigrationCommand() : Command("_configsvrCommitChunkMigration") {}
void help(std::stringstream& help) const override {
help << "should not be calling this directly";
}
bool slaveOk() const override {
return false;
}
bool adminOnly() const override {
return true;
}
virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
return true;
}
Status checkAuthForCommand(Client* client,
const std::string& dbname,
const BSONObj& cmdObj) override {
if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
ResourcePattern::forClusterResource(), ActionType::internal)) {
return Status(ErrorCodes::Unauthorized, "Unauthorized");
}
return Status::OK();
}
std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
return parseNsFullyQualified(dbname, cmdObj);
}
static void checkChunkIsOnShard(OperationContext* txn,
const NamespaceString& nss,
const BSONObj& min,
const BSONObj& max,
const ShardId& shard) {
BSONObj chunkQuery =
BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max
<< ChunkType::shard()
<< shard);
// Must use kLocalReadConcern because using majority will set a flag on the recovery unit
// that conflicts with the subsequent writes in the CommitChunkMigration command.
auto findResponse = uassertStatusOK(
Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kLocalReadConcern,
NamespaceString(ChunkType::ConfigNS),
chunkQuery,
BSONObj(),
1));
uassert(40165,
str::stream()
<< "Could not find the chunk ("
<< chunkQuery.toString()
<< ") on the shard. Cannot execute the migration commit with invalid chunks.",
!findResponse.docs.empty());
}
BSONObj makeCommitChunkApplyOpsCommand(
const NamespaceString& nss,
const ChunkRange& migratedChunkRange,
const boost::optional& controlChunkRange,
const ChunkVersion newMigratedChunkVersion,
const boost::optional newControlChunkVersion,
StringData toShard,
StringData fromShard) {
// Update migratedChunk's version and shard.
BSONArrayBuilder updates;
{
BSONObjBuilder op;
op.append("op", "u");
op.appendBool("b", false); // No upserting
op.append("ns", ChunkType::ConfigNS);
BSONObjBuilder n(op.subobjStart("o"));
n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunkRange.getMin()));
newMigratedChunkVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
n.append(ChunkType::ns(), nss.ns());
n.append(ChunkType::min(), migratedChunkRange.getMin());
n.append(ChunkType::max(), migratedChunkRange.getMax());
n.append(ChunkType::shard(), toShard);
n.done();
BSONObjBuilder q(op.subobjStart("o2"));
q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunkRange.getMin()));
q.done();
updates.append(op.obj());
}
// If we have a controlChunk, update its chunk version.
if (controlChunkRange) {
BSONObjBuilder op;
op.append("op", "u");
op.appendBool("b", false);
op.append("ns", ChunkType::ConfigNS);
BSONObjBuilder n(op.subobjStart("o"));
n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunkRange->getMin()));
newControlChunkVersion->addToBSON(n, ChunkType::DEPRECATED_lastmod());
n.append(ChunkType::ns(), nss.ns());
n.append(ChunkType::min(), controlChunkRange->getMin());
n.append(ChunkType::max(), controlChunkRange->getMax());
n.append(ChunkType::shard(), fromShard);
n.done();
BSONObjBuilder q(op.subobjStart("o2"));
q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunkRange->getMin()));
q.done();
updates.append(op.obj());
}
// Do not give applyOps a write concern. If applyOps tries to wait for replication, it will
// fail because of the GlobalWrite lock CommitChunkMigration already holds. Replication will
// not be able to take the lock it requires.
return BSON("applyOps" << updates.arr());
}
bool run(OperationContext* txn,
const std::string& dbName,
BSONObj& cmdObj,
int options,
std::string& errmsg,
BSONObjBuilder& result) override {
const NamespaceString nss = NamespaceString(parseNs(dbName, cmdObj));
CommitChunkMigrationRequest commitChunkMigrationRequest =
uassertStatusOK(CommitChunkMigrationRequest::createFromCommand(nss, cmdObj));
// Run operations under a nested lock as a hack to prevent yielding. When query/applyOps
// commands are called, they will take a second lock, and the PlanExecutor will be unable to
// yield.
//
// ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions
// for migrated chunks are generated within the command. Therefore it cannot be allowed to
// yield between generating the ChunkVersion and committing it to the database with
// applyOps.
Lock::GlobalWrite firstGlobalWriteLock(txn->lockState());
// Check that migratedChunk and controlChunk are where they should be, on fromShard.
checkChunkIsOnShard(txn,
nss,
commitChunkMigrationRequest.getMigratedChunkRange().getMin(),
commitChunkMigrationRequest.getMigratedChunkRange().getMax(),
commitChunkMigrationRequest.getFromShard());
if (commitChunkMigrationRequest.hasControlChunkRange()) {
checkChunkIsOnShard(txn,
nss,
commitChunkMigrationRequest.getControlChunkRange().getMin(),
commitChunkMigrationRequest.getControlChunkRange().getMax(),
commitChunkMigrationRequest.getFromShard());
}
// Generate the new chunk version (CV). Query the current max CV of the collection. Use the
// incremented major version of the result returned. Migrating chunk's minor version will
// be 0, control chunk's minor version will be 1 (if control chunk is present).
// Must use kLocalReadConcern because using majority will set a flag on the recovery unit
// that conflicts with the subsequent writes.
auto findResponse = uassertStatusOK(
Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
repl::ReadConcernLevel::kLocalReadConcern,
NamespaceString(ChunkType::ConfigNS),
BSON("ns" << nss.ns()),
BSON(ChunkType::DEPRECATED_lastmod << -1),
1));
std::vector chunksVector = findResponse.docs;
uassert(40164,
str::stream() << "Tried to find max chunk version for collection '" << nss.ns()
<< ", but found no chunks",
!chunksVector.empty());
ChunkVersion currentMaxVersion =
ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod());
// Generate the new versions of migratedChunk and controlChunk.
ChunkVersion newMigratedChunkVersion =
ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch());
boost::optional newControlChunkVersion = boost::none;
boost::optional newControlChunkRange = boost::none;
if (commitChunkMigrationRequest.hasControlChunkRange()) {
newControlChunkVersion =
ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch());
newControlChunkRange = commitChunkMigrationRequest.getControlChunkRange();
}
auto applyOpsCommandResponse =
Grid::get(txn)->shardRegistry()->getConfigShard()->runCommand(
txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
nss.db().toString(),
makeCommitChunkApplyOpsCommand(
nss,
commitChunkMigrationRequest.getMigratedChunkRange(),
newControlChunkRange,
newMigratedChunkVersion,
newControlChunkVersion,
commitChunkMigrationRequest.getToShard().toString(),
commitChunkMigrationRequest.getFromShard().toString()),
Shard::RetryPolicy::kIdempotent);
if (!applyOpsCommandResponse.isOK()) {
return appendCommandStatus(result, applyOpsCommandResponse.getStatus());
}
if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) {
return appendCommandStatus(result, applyOpsCommandResponse.getValue().commandStatus);
}
newMigratedChunkVersion.appendWithFieldForCommands(&result, "migratedChunkVersion");
if (commitChunkMigrationRequest.hasControlChunkRange()) {
newControlChunkVersion->appendWithFieldForCommands(&result, "controlChunkVersion");
}
return true;
}
} configsvrCommitChunkMigrationCommand;
} // namespace
} // namespace mongo