/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/catalog/sharding_catalog_manager_impl.h" #include "mongo/base/status_with.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connection_string.h" #include "mongo/client/read_preference.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace { MONGO_FP_DECLARE(migrationCommitVersionError); /** * Append min, max and version information from chunk to the buffer for logChange purposes. */ void appendShortVersion(BufBuilder* b, const ChunkType& chunk) { BSONObjBuilder bb(*b); bb.append(ChunkType::min(), chunk.getMin()); bb.append(ChunkType::max(), chunk.getMax()); if (chunk.isVersionSet()) chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod()); bb.done(); } BSONArray buildMergeChunksApplyOpsUpdates(const std::vector& chunksToMerge, const ChunkVersion& mergeVersion) { BSONArrayBuilder updates; // Build an update operation to expand the first chunk into the newly merged chunk { BSONObjBuilder op; op.append("op", "u"); op.appendBool("b", false); // no upsert op.append("ns", ChunkType::ConfigNS); // expand first chunk into newly merged chunk ChunkType mergedChunk(chunksToMerge.front()); mergedChunk.setMax(chunksToMerge.back().getMax()); // fill in additional details for sending through applyOps mergedChunk.setVersion(mergeVersion); // add the new chunk information as the update object op.append("o", mergedChunk.toConfigBSON()); // query object op.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); updates.append(op.obj()); } // Build update operations to delete the rest of the chunks to be merged. Remember not // to delete the first chunk we're expanding for (size_t i = 1; i < chunksToMerge.size(); ++i) { BSONObjBuilder op; op.append("op", "d"); op.append("ns", ChunkType::ConfigNS); op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName()))); updates.append(op.obj()); } return updates.arr(); } BSONArray buildMergeChunksApplyOpsPrecond(const std::vector& chunksToMerge, const ChunkVersion& collVersion) { BSONArrayBuilder preCond; for (auto chunk : chunksToMerge) { BSONObjBuilder b; b.append("ns", ChunkType::ConfigNS); b.append( "q", BSON("query" << BSON(ChunkType::ns(chunk.getNS()) << ChunkType::min(chunk.getMin()) << ChunkType::max(chunk.getMax())) << "orderby" << BSON(ChunkType::DEPRECATED_lastmod() << -1))); b.append("res", BSON(ChunkType::DEPRECATED_epoch(collVersion.epoch()) << ChunkType::shard(chunk.getShard().toString()))); preCond.append(b.obj()); } return preCond.arr(); } Status checkChunkIsOnShard(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& min, const BSONObj& max, const ShardId& shard) { BSONObj chunkQuery = BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max << ChunkType::shard() << shard); // Must use local read concern because we're going to perform subsequent writes. auto findResponseWith = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(ChunkType::ConfigNS), chunkQuery, BSONObj(), 1); if (!findResponseWith.isOK()) { return findResponseWith.getStatus(); } if (findResponseWith.getValue().docs.empty()) { return {ErrorCodes::Error(40165), str::stream() << "Could not find the chunk (" << chunkQuery.toString() << ") on the shard. Cannot execute the migration commit with invalid chunks."}; } return Status::OK(); } BSONObj makeCommitChunkApplyOpsCommand(const NamespaceString& nss, const ChunkType& migratedChunk, const boost::optional& controlChunk, StringData fromShard, StringData toShard) { // Update migratedChunk's version and shard. BSONArrayBuilder updates; { BSONObjBuilder op; op.append("op", "u"); op.appendBool("b", false); // No upserting op.append("ns", ChunkType::ConfigNS); BSONObjBuilder n(op.subobjStart("o")); n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin())); migratedChunk.getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod()); n.append(ChunkType::ns(), nss.ns()); n.append(ChunkType::min(), migratedChunk.getMin()); n.append(ChunkType::max(), migratedChunk.getMax()); n.append(ChunkType::shard(), toShard); n.done(); BSONObjBuilder q(op.subobjStart("o2")); q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin())); q.done(); updates.append(op.obj()); } // If we have a controlChunk, update its chunk version. if (controlChunk) { BSONObjBuilder op; op.append("op", "u"); op.appendBool("b", false); op.append("ns", ChunkType::ConfigNS); BSONObjBuilder n(op.subobjStart("o")); n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin())); controlChunk->getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod()); n.append(ChunkType::ns(), nss.ns()); n.append(ChunkType::min(), controlChunk->getMin()); n.append(ChunkType::max(), controlChunk->getMax()); n.append(ChunkType::shard(), fromShard); n.done(); BSONObjBuilder q(op.subobjStart("o2")); q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin())); q.done(); updates.append(op.obj()); } // Do not give applyOps a write concern. If applyOps tries to wait for replication, it will fail // because of the GlobalWrite lock CommitChunkMigration already holds. Replication will not be // able to take the lock it requires. return BSON("applyOps" << updates.arr()); } } // namespace Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* opCtx, const NamespaceString& ns, const OID& requestEpoch, const ChunkRange& range, const std::vector& splitPoints, const std::string& shardName) { // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ // move chunks on different collections to proceed in parallel Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); // Get the chunk with highest version for this namespace auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(ChunkType::ConfigNS), BSON("ns" << ns.ns()), BSON(ChunkType::DEPRECATED_lastmod << -1), 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& chunksVector = findStatus.getValue().docs; if (chunksVector.empty()) return {ErrorCodes::IllegalOperation, "collection does not exist, isn't sharded, or has no chunks"}; ChunkVersion collVersion = ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); // Return an error if epoch of chunk does not match epoch of request if (collVersion.epoch() != requestEpoch) { return {ErrorCodes::StaleEpoch, "epoch of chunk does not match epoch of request. This most likely means " "that the collection was dropped and re-created."}; } std::vector newChunks; ChunkVersion currentMaxVersion = collVersion; auto startKey = range.getMin(); auto newChunkBounds(splitPoints); newChunkBounds.push_back(range.getMax()); BSONArrayBuilder updates; for (const auto& endKey : newChunkBounds) { // Verify the split points are all within the chunk if (endKey.woCompare(range.getMax()) != 0 && !range.containsKey(endKey)) { return {ErrorCodes::InvalidOptions, str::stream() << "Split key " << endKey << " not contained within chunk " << range.toString()}; } // Verify the split points came in increasing order if (endKey.woCompare(startKey) < 0) { return { ErrorCodes::InvalidOptions, str::stream() << "Split keys must be specified in strictly increasing order. Key " << endKey << " was specified after " << startKey << "."}; } // Verify that splitPoints are not repeated if (endKey.woCompare(startKey) == 0) { return {ErrorCodes::InvalidOptions, str::stream() << "Split on lower bound of chunk " << ChunkRange(startKey, endKey).toString() << "is not allowed"}; } // verify that splits don't create too-big shard keys Status shardKeyStatus = ShardKeyPattern::checkShardKeySize(endKey); if (!shardKeyStatus.isOK()) { return shardKeyStatus; } // splits only update the 'minor' portion of version currentMaxVersion.incMinor(); // build an update operation against the chunks collection of the config database // with upsert true BSONObjBuilder op; op.append("op", "u"); op.appendBool("b", true); op.append("ns", ChunkType::ConfigNS); // add the modified (new) chunk information as the update object BSONObjBuilder n(op.subobjStart("o")); n.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey)); currentMaxVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); n.append(ChunkType::ns(), ns.ns()); n.append(ChunkType::min(), startKey); n.append(ChunkType::max(), endKey); n.append(ChunkType::shard(), shardName); n.done(); // add the chunk's _id as the query part of the update statement BSONObjBuilder q(op.subobjStart("o2")); q.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey)); q.done(); updates.append(op.obj()); // remember this chunk info for logging later ChunkType chunk; chunk.setMin(startKey); chunk.setMax(endKey); chunk.setVersion(currentMaxVersion); newChunks.push_back(std::move(chunk)); startKey = endKey; } BSONArrayBuilder preCond; { BSONObjBuilder b; b.append("ns", ChunkType::ConfigNS); b.append("q", BSON("query" << BSON(ChunkType::ns(ns.ns()) << ChunkType::min() << range.getMin() << ChunkType::max() << range.getMax()) << "orderby" << BSON(ChunkType::DEPRECATED_lastmod() << -1))); { BSONObjBuilder bb(b.subobjStart("res")); bb.append(ChunkType::DEPRECATED_epoch(), requestEpoch); bb.append(ChunkType::shard(), shardName); } preCond.append(b.obj()); } // apply the batch of updates to remote and local metadata Status applyOpsStatus = Grid::get(opCtx)->catalogClient(opCtx)->applyChunkOpsDeprecated( opCtx, updates.arr(), preCond.arr(), ns.ns(), currentMaxVersion, WriteConcernOptions(), repl::ReadConcernLevel::kLocalReadConcern); if (!applyOpsStatus.isOK()) { return applyOpsStatus; } // log changes BSONObjBuilder logDetail; { BSONObjBuilder b(logDetail.subobjStart("before")); b.append(ChunkType::min(), range.getMin()); b.append(ChunkType::max(), range.getMax()); collVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod()); } if (newChunks.size() == 2) { appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]); appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]); Grid::get(opCtx) ->catalogClient(opCtx) ->logChange(opCtx, "split", ns.ns(), logDetail.obj(), WriteConcernOptions()) .transitional_ignore(); } else { BSONObj beforeDetailObj = logDetail.obj(); BSONObj firstDetailObj = beforeDetailObj.getOwned(); const int newChunksSize = newChunks.size(); for (int i = 0; i < newChunksSize; i++) { BSONObjBuilder chunkDetail; chunkDetail.appendElements(beforeDetailObj); chunkDetail.append("number", i + 1); chunkDetail.append("of", newChunksSize); appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]); Grid::get(opCtx) ->catalogClient(opCtx) ->logChange(opCtx, "multi-split", ns.ns(), chunkDetail.obj(), WriteConcernOptions()) .transitional_ignore(); } } return applyOpsStatus; } Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* opCtx, const NamespaceString& ns, const OID& requestEpoch, const std::vector& chunkBoundaries, const std::string& shardName) { // This method must never be called with empty chunks to merge invariant(!chunkBoundaries.empty()); // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ // move chunks on different collections to proceed in parallel Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); // Get the chunk with the highest version for this namespace auto findStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(ChunkType::ConfigNS), BSON("ns" << ns.ns()), BSON(ChunkType::DEPRECATED_lastmod << -1), 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } const auto& chunksVector = findStatus.getValue().docs; if (chunksVector.empty()) return {ErrorCodes::IllegalOperation, "collection does not exist, isn't sharded, or has no chunks"}; ChunkVersion collVersion = ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); // Return an error if epoch of chunk does not match epoch of request if (collVersion.epoch() != requestEpoch) { return {ErrorCodes::StaleEpoch, "epoch of chunk does not match epoch of request. This most likely means " "that the collection was dropped and re-created."}; } // Build chunks to be merged std::vector chunksToMerge; ChunkType itChunk; itChunk.setMax(chunkBoundaries.front()); itChunk.setNS(ns.ns()); itChunk.setShard(shardName); // Do not use the first chunk boundary as a max bound while building chunks for (size_t i = 1; i < chunkBoundaries.size(); ++i) { itChunk.setMin(itChunk.getMax()); // Ensure the chunk boundaries are strictly increasing if (chunkBoundaries[i].woCompare(itChunk.getMin()) <= 0) { return { ErrorCodes::InvalidOptions, str::stream() << "Chunk boundaries must be specified in strictly increasing order. Boundary " << chunkBoundaries[i] << " was specified after " << itChunk.getMin() << "."}; } itChunk.setMax(chunkBoundaries[i]); chunksToMerge.push_back(itChunk); } ChunkVersion mergeVersion = collVersion; mergeVersion.incMinor(); auto updates = buildMergeChunksApplyOpsUpdates(chunksToMerge, mergeVersion); auto preCond = buildMergeChunksApplyOpsPrecond(chunksToMerge, collVersion); // apply the batch of updates to remote and local metadata Status applyOpsStatus = Grid::get(opCtx)->catalogClient(opCtx)->applyChunkOpsDeprecated( opCtx, updates, preCond, ns.ns(), mergeVersion, WriteConcernOptions(), repl::ReadConcernLevel::kLocalReadConcern); if (!applyOpsStatus.isOK()) { return applyOpsStatus; } // log changes BSONObjBuilder logDetail; { BSONArrayBuilder b(logDetail.subarrayStart("merged")); for (auto chunkToMerge : chunksToMerge) { b.append(chunkToMerge.toConfigBSON()); } } collVersion.addToBSON(logDetail, "prevShardVersion"); mergeVersion.addToBSON(logDetail, "mergedVersion"); Grid::get(opCtx) ->catalogClient(opCtx) ->logChange(opCtx, "merge", ns.ns(), logDetail.obj(), WriteConcernOptions()) .transitional_ignore(); return applyOpsStatus; } StatusWith ShardingCatalogManagerImpl::commitChunkMigration( OperationContext* opCtx, const NamespaceString& nss, const ChunkType& migratedChunk, const boost::optional& controlChunk, const OID& collectionEpoch, const ShardId& fromShard, const ShardId& toShard) { auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations. // // ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions // for migrated chunks are generated within the command and must be committed to the database // before another chunk commit generates new ChunkVersions in the same manner. // // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ // move chunks on different collections to proceed in parallel. // (Note: This is not needed while we have a global lock, taken here only for consistency.) Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); // Must use local read concern because we will perform subsequent writes. auto findResponse = configShard->exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, NamespaceString(ChunkType::ConfigNS), BSON("ns" << nss.ns()), BSON(ChunkType::DEPRECATED_lastmod << -1), 1); if (!findResponse.isOK()) { return findResponse.getStatus(); } if (MONGO_FAIL_POINT(migrationCommitVersionError)) { uassert(ErrorCodes::StaleEpoch, "failpoint 'migrationCommitVersionError' generated error", false); } const auto chunksVector = std::move(findResponse.getValue().docs); if (chunksVector.empty()) { return {ErrorCodes::IncompatibleShardingMetadata, str::stream() << "Tried to find max chunk version for collection '" << nss.ns() << ", but found no chunks"}; } const auto swChunk = ChunkType::fromConfigBSON(chunksVector.front()); if (!swChunk.isOK()) { return swChunk.getStatus(); } const auto currentCollectionVersion = swChunk.getValue().getVersion(); // It is possible for a migration to end up running partly without the protection of the // distributed lock if the config primary stepped down since the start of the migration and // failed to recover the migration. Check that the collection has not been dropped and recreated // since the migration began, unbeknown to the shard when the command was sent. if (currentCollectionVersion.epoch() != collectionEpoch) { return {ErrorCodes::StaleEpoch, str::stream() << "The collection '" << nss.ns() << "' has been dropped and recreated since the migration began." " The config server's collection version epoch is now '" << currentCollectionVersion.epoch().toString() << "', but the shard's is " << collectionEpoch.toString() << "'. Aborting migration commit for chunk (" << migratedChunk.getRange().toString() << ")."}; } // Check that migratedChunk and controlChunk are where they should be, on fromShard. auto migratedOnShard = checkChunkIsOnShard(opCtx, nss, migratedChunk.getMin(), migratedChunk.getMax(), fromShard); if (!migratedOnShard.isOK()) { return migratedOnShard; } if (controlChunk) { auto controlOnShard = checkChunkIsOnShard( opCtx, nss, controlChunk->getMin(), controlChunk->getMax(), fromShard); if (!controlOnShard.isOK()) { return controlOnShard; } } // Generate the new versions of migratedChunk and controlChunk. Migrating chunk's minor version // will be 0. ChunkType newMigratedChunk = migratedChunk; newMigratedChunk.setVersion(ChunkVersion( currentCollectionVersion.majorVersion() + 1, 0, currentCollectionVersion.epoch())); // Control chunk's minor version will be 1 (if control chunk is present). boost::optional newControlChunk = boost::none; if (controlChunk) { newControlChunk = controlChunk.get(); newControlChunk->setVersion(ChunkVersion( currentCollectionVersion.majorVersion() + 1, 1, currentCollectionVersion.epoch())); } auto command = makeCommitChunkApplyOpsCommand( nss, newMigratedChunk, newControlChunk, fromShard.toString(), toShard.toString()); StatusWith applyOpsCommandResponse = configShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, nss.db().toString(), command, Shard::RetryPolicy::kIdempotent); if (!applyOpsCommandResponse.isOK()) { return applyOpsCommandResponse.getStatus(); } if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) { return applyOpsCommandResponse.getValue().commandStatus; } BSONObjBuilder result; newMigratedChunk.getVersion().appendWithFieldForCommands(&result, "migratedChunkVersion"); if (controlChunk) { newControlChunk->getVersion().appendWithFieldForCommands(&result, "controlChunkVersion"); } return result.obj(); } } // namespace mongo