diff options
author | Nathan Myers <nathan.myers@10gen.com> | 2016-12-05 10:45:16 -0500 |
---|---|---|
committer | Nathan Myers <nathan.myers@10gen.com> | 2016-12-05 10:45:16 -0500 |
commit | 9b403d89bb81064a9d4813724d075c51b0121e97 (patch) | |
tree | b4b689c5ad238a24213194c314d973def3583fdc | |
parent | 2d235202ad823bb0d1b0d99cf55c1589e57d67b9 (diff) | |
download | mongo-9b403d89bb81064a9d4813724d075c51b0121e97.tar.gz |
SERVER-26562 Move CommitChunkMigration cmd impl to catalog, prep for unit tests
10 files changed, 446 insertions, 342 deletions
diff --git a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp index 0db1f4eae09..10da0444a4a 100644 --- a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp +++ b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" +#include "mongo/base/status_with.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -38,20 +39,21 @@ #include "mongo/db/s/chunk_move_write_concern_options.h" #include "mongo/db/s/sharding_state.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_locks.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/commit_chunk_migration_request_type.h" +#include "mongo/util/fail_point.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { namespace { -MONGO_FP_DECLARE(migrationCommitError); -MONGO_FP_DECLARE(migrationCommitVersionError); +MONGO_FP_DECLARE(migrationCommitError); // delete this in 3.5 /** * This command takes the chunk being migrated ("migratedChunk") and generates a new version for it @@ -71,12 +73,12 @@ MONGO_FP_DECLARE(migrationCommitVersionError); * Command Format: * { * _configsvrCommitChunkMigration: <database>.<collection>, - * migratedChunk: {min: <min_value>, max: <max_value>}, - * controlChunk: {min: <min_value>, max: <max_value>}, (optional) * fromShard: "<from_shard_name>", * toShard: "<to_shard_name>", - * fromShardCollectionVersion: <chunk_version>, - * shardHasDistributedLock: true/false, + * migratedChunk: {min: <min_value>, max: <max_value>, etc. }, + * controlChunk: {min: <min_value>, max: <max_value>, etc. }, (optional) + * fromShardCollectionVersion: { shardVersionField: <version> }, (for backward compatibility only) + * shardHasDistributedLock: true/false (for backward compatibility only) * } * * Returns: @@ -121,158 +123,18 @@ public: } /** - * Checks that the epoch in the version the shard sent with the command matches the epoch of the - * collection version found on the config server. It is possible for a migration to end up - * running partly without the protection of the distributed lock. This function checks that the - * collection has not been dropped and recreated since the migration began, unbeknown to the - * shard when the command was sent. - */ - void checkCollectionVersionEpoch(OperationContext* txn, - const NamespaceString& nss, - const ChunkRange& chunkRange, - const ChunkVersion& shardCollectionVersion) { - auto findResponse = uassertStatusOK( - Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - BSON(ChunkType::ns() << nss.ns()), - BSONObj(), - 1)); - - if (MONGO_FAIL_POINT(migrationCommitVersionError)) { - uassert(ErrorCodes::StaleEpoch, - "failpoint 'migrationCommitVersionError' generated error", - false); - } - - uassert(ErrorCodes::IncompatibleShardingMetadata, - str::stream() - << "Could not find any chunks for collection '" - << nss.ns() - << "'. The collection has been dropped since the migration began. Aborting " - << "migration commit for chunk (" - << redact(chunkRange.toString()) - << ").", - !findResponse.docs.empty()); - - ChunkType chunk = uassertStatusOK(ChunkType::fromBSON(findResponse.docs.front())); - - uassert(ErrorCodes::StaleEpoch, - str::stream() << "The collection '" << nss.ns() - << "' has been dropped and recreated since the migration began." - << " The config server's collection version epoch is now '" - << chunk.getVersion().epoch().toString() - << "', but the shard's is " - << shardCollectionVersion.epoch().toString() - << "'. Aborting migration commit for chunk (" - << chunkRange.toString() - << ").", - chunk.getVersion().hasEqualEpoch(shardCollectionVersion)); - } - - static void checkChunkIsOnShard(OperationContext* txn, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const ShardId& shard) { - BSONObj chunkQuery = - BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max - << ChunkType::shard() - << shard); - // Must use local read concern because we're going to perform subsequent writes. - auto findResponse = uassertStatusOK( - Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - chunkQuery, - BSONObj(), - 1)); - uassert(40165, - str::stream() - << "Could not find the chunk (" - << chunkQuery.toString() - << ") on the shard. Cannot execute the migration commit with invalid chunks.", - !findResponse.docs.empty()); - } - - BSONObj makeCommitChunkApplyOpsCommand( - const NamespaceString& nss, - const ChunkRange& migratedChunkRange, - const boost::optional<ChunkRange>& controlChunkRange, - const ChunkVersion newMigratedChunkVersion, - const boost::optional<ChunkVersion> newControlChunkVersion, - StringData toShard, - StringData fromShard) { - // Update migratedChunk's version and shard. - BSONArrayBuilder updates; - { - BSONObjBuilder op; - op.append("op", "u"); - op.appendBool("b", false); // No upserting - op.append("ns", ChunkType::ConfigNS); - - BSONObjBuilder n(op.subobjStart("o")); - n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunkRange.getMin())); - newMigratedChunkVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); - n.append(ChunkType::ns(), nss.ns()); - n.append(ChunkType::min(), migratedChunkRange.getMin()); - n.append(ChunkType::max(), migratedChunkRange.getMax()); - n.append(ChunkType::shard(), toShard); - n.done(); - - BSONObjBuilder q(op.subobjStart("o2")); - q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunkRange.getMin())); - q.done(); - - updates.append(op.obj()); - } - - // If we have a controlChunk, update its chunk version. - if (controlChunkRange) { - BSONObjBuilder op; - op.append("op", "u"); - op.appendBool("b", false); - op.append("ns", ChunkType::ConfigNS); - - BSONObjBuilder n(op.subobjStart("o")); - n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunkRange->getMin())); - newControlChunkVersion->addToBSON(n, ChunkType::DEPRECATED_lastmod()); - n.append(ChunkType::ns(), nss.ns()); - n.append(ChunkType::min(), controlChunkRange->getMin()); - n.append(ChunkType::max(), controlChunkRange->getMax()); - n.append(ChunkType::shard(), fromShard); - n.done(); - - BSONObjBuilder q(op.subobjStart("o2")); - q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunkRange->getMin())); - q.done(); - - updates.append(op.obj()); - } - - // Do not give applyOps a write concern. If applyOps tries to wait for replication, it will - // fail because of the GlobalWrite lock CommitChunkMigration already holds. Replication will - // not be able to take the lock it requires. - return BSON("applyOps" << updates.arr()); - } - - /** * Assures that the balancer still holds the collection distributed lock for this collection. If - * it no longer does, uassert, because we don't know if the collection state has changed -- e.g. + * it no longer does, fail because we don't know if the collection state has changed -- e.g. * whether it was/is dropping, whether another imcompatible migration is running, etc.. */ - void checkBalancerHasDistLock(OperationContext* txn, - const NamespaceString& nss, - const ChunkRange& chunkRange) { + static Status checkBalancerHasDistLock(OperationContext* txn, + const NamespaceString& nss, + const ChunkType& chunk) { auto balancerDistLockProcessID = Grid::get(txn)->catalogClient(txn)->getDistLockManager()->getProcessID(); // Must use local read concern because we're going to perform subsequent writes. - auto lockQueryResponse = uassertStatusOK( + auto lockQueryResponseWith = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, @@ -280,21 +142,27 @@ public: NamespaceString(LocksType::ConfigNS), BSON(LocksType::process(balancerDistLockProcessID) << LocksType::name(nss.ns())), BSONObj(), - boost::none)); + boost::none); + if (!lockQueryResponseWith.isOK()) { + return lockQueryResponseWith.getStatus(); + } - invariant(lockQueryResponse.docs.size() <= 1); + invariant(lockQueryResponseWith.getValue().docs.size() <= 1); if (MONGO_FAIL_POINT(migrationCommitError)) { - lockQueryResponse.docs.clear(); + lockQueryResponseWith.getValue().docs.clear(); } - uassert(ErrorCodes::BalancerLostDistributedLock, + if (lockQueryResponseWith.getValue().docs.size() != 1) { + return Status( + ErrorCodes::BalancerLostDistributedLock, str::stream() << "The distributed lock for collection '" << nss.ns() << "' was lost by the balancer since this migration began. Cannot " << "proceed with the migration commit for chunk (" - << chunkRange.toString() - << ") because it could corrupt other operations.", - lockQueryResponse.docs.size() == 1); + << chunk.getRange().toString() + << ") because it could corrupt other operations."); + } + return Status::OK(); } bool run(OperationContext* txn, @@ -303,108 +171,31 @@ public: int options, std::string& errmsg, BSONObjBuilder& result) override { + const NamespaceString nss = NamespaceString(parseNs(dbName, cmdObj)); - CommitChunkMigrationRequest commitChunkMigrationRequest = + auto commitRequest = uassertStatusOK(CommitChunkMigrationRequest::createFromCommand(nss, cmdObj)); - // Run operations under a nested lock as a hack to prevent yielding. When query/applyOps - // commands are called, they will take a second lock, and the PlanExecutor will be unable to - // yield. - // - // ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions - // for migrated chunks are generated within the command. Therefore it cannot be allowed to - // yield between generating the ChunkVersion and committing it to the database with - // applyOps. - Lock::GlobalWrite firstGlobalWriteLock(txn->lockState()); - - if (!commitChunkMigrationRequest.shardHasDistributedLock()) { - checkBalancerHasDistLock(txn, nss, commitChunkMigrationRequest.getMigratedChunkRange()); + if (!commitRequest.shardHasDistributedLock()) { + auto check = checkBalancerHasDistLock(txn, nss, commitRequest.getMigratedChunk()); + if (!check.isOK()) { + return appendCommandStatus(result, check); + } } - checkCollectionVersionEpoch(txn, - nss, - commitChunkMigrationRequest.getMigratedChunkRange(), - commitChunkMigrationRequest.getFromShardCollectionVersion()); - - // Check that migratedChunk and controlChunk are where they should be, on fromShard. - checkChunkIsOnShard(txn, - nss, - commitChunkMigrationRequest.getMigratedChunkRange().getMin(), - commitChunkMigrationRequest.getMigratedChunkRange().getMax(), - commitChunkMigrationRequest.getFromShard()); - - if (commitChunkMigrationRequest.hasControlChunkRange()) { - checkChunkIsOnShard(txn, - nss, - commitChunkMigrationRequest.getControlChunkRange().getMin(), - commitChunkMigrationRequest.getControlChunkRange().getMax(), - commitChunkMigrationRequest.getFromShard()); + StatusWith<BSONObj> response = Grid::get(txn)->catalogManager()->commitChunkMigration( + txn, + nss, + commitRequest.getMigratedChunk(), + commitRequest.getControlChunk(), + commitRequest.getCollectionEpoch(), + commitRequest.getFromShard(), + commitRequest.getToShard()); + if (!response.isOK()) { + return appendCommandStatus(result, response.getStatus()); } - - // Generate the new chunk version (CV). Query the current max CV of the collection. Use the - // incremented major version of the result returned. Migrating chunk's minor version will - // be 0, control chunk's minor version will be 1 (if control chunk is present). - - // Must use local read concern because we're going to perform subsequent writes. - auto findResponse = uassertStatusOK( - Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - BSON("ns" << nss.ns()), - BSON(ChunkType::DEPRECATED_lastmod << -1), - 1)); - - std::vector<BSONObj> chunksVector = findResponse.docs; - uassert(40164, - str::stream() << "Tried to find max chunk version for collection '" << nss.ns() - << ", but found no chunks", - !chunksVector.empty()); - - ChunkVersion currentMaxVersion = - ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); - - // Generate the new versions of migratedChunk and controlChunk. - ChunkVersion newMigratedChunkVersion = - ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch()); - boost::optional<ChunkVersion> newControlChunkVersion = boost::none; - boost::optional<ChunkRange> newControlChunkRange = boost::none; - if (commitChunkMigrationRequest.hasControlChunkRange()) { - newControlChunkVersion = - ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch()); - newControlChunkRange = commitChunkMigrationRequest.getControlChunkRange(); - } - - auto applyOpsCommandResponse = - Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - nss.db().toString(), - makeCommitChunkApplyOpsCommand( - nss, - commitChunkMigrationRequest.getMigratedChunkRange(), - newControlChunkRange, - newMigratedChunkVersion, - newControlChunkVersion, - commitChunkMigrationRequest.getToShard().toString(), - commitChunkMigrationRequest.getFromShard().toString()), - Shard::RetryPolicy::kIdempotent); - - if (!applyOpsCommandResponse.isOK()) { - return appendCommandStatus(result, applyOpsCommandResponse.getStatus()); - } - - if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) { - return appendCommandStatus(result, applyOpsCommandResponse.getValue().commandStatus); - } - - newMigratedChunkVersion.appendWithFieldForCommands(&result, "migratedChunkVersion"); - if (commitChunkMigrationRequest.hasControlChunkRange()) { - newControlChunkVersion->appendWithFieldForCommands(&result, "controlChunkVersion"); - } - + result.appendElements(response.getValue()); return true; } diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index 63f242d3598..da7709369b9 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -25,7 +25,6 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ - #pragma once #include <string> @@ -43,6 +42,7 @@ class OperationContext; class RemoteCommandTargeter; class ShardId; class ShardType; +class ChunkType; class Status; template <typename T> class StatusWith; @@ -138,8 +138,8 @@ public: const ChunkRange& range) = 0; /** - * Updates chunk metadata in config.chunks collection to reflect the given chunk being split - * into multiple smaller chunks based on the specified split points. + * Updates metadata in config.chunks collection to show the given chunk as split + * into smaller chunks at the specified split points. */ virtual Status commitChunkSplit(OperationContext* txn, const NamespaceString& ns, @@ -149,8 +149,8 @@ public: const std::string& shardName) = 0; /** - * Updates chunk metadata in config.chunks collection to reflect the given chunks being merged - * into a single larger chunk based on the specified boundaries of the smaller chunks. + * Updates metadata in config.chunks collection so the chunks with given boundaries are seen + * merged into a single larger chunk. */ virtual Status commitChunkMerge(OperationContext* txn, const NamespaceString& ns, @@ -159,6 +159,17 @@ public: const std::string& shardName) = 0; /** + * Updates metadata in config.chunks collection to show the given chunk in its new shard. + */ + virtual StatusWith<BSONObj> commitChunkMigration(OperationContext* txn, + const NamespaceString& nss, + const ChunkType& migratedChunk, + const boost::optional<ChunkType>& controlChunk, + const OID& collectionEpoch, + const ShardId& fromShard, + const ShardId& toShard) = 0; + + /** * Append information about the connection pools owned by the CatalogManager. */ virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0; diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp index 17b3235256c..c519dba4b74 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp @@ -34,6 +34,7 @@ #include <iomanip> +#include "mongo/base/error_codes.h" #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/bson/bsonobjbuilder.h" @@ -61,6 +62,7 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_config_version.h" #include "mongo/s/catalog/type_database.h" @@ -86,6 +88,7 @@ namespace mongo { MONGO_FP_DECLARE(dontUpsertShardIdentityOnNewShards); +MONGO_FP_DECLARE(migrationCommitVersionError); using std::string; using std::vector; @@ -1473,6 +1476,279 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, return applyOpsStatus; } +namespace { + +/** + * Checks that the epoch in the version the shard sent with the command matches the epoch of the + * collection version found on the config server. It is possible for a migration to end up + * running partly without the protection of the distributed lock. This function checks that the + * collection has not been dropped and recreated since the migration began, unbeknown to the + * shard when the command was sent. + */ +static Status checkCollectionVersionEpoch(OperationContext* txn, + const NamespaceString& nss, + const ChunkType& aChunk, + const OID& collectionEpoch) { + auto findResponseWith = + Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON(ChunkType::ns() << nss.ns()), + BSONObj(), + 1); + if (!findResponseWith.isOK()) { + return findResponseWith.getStatus(); + } + + if (MONGO_FAIL_POINT(migrationCommitVersionError)) { + uassert(ErrorCodes::StaleEpoch, + "failpoint 'migrationCommitVersionError' generated error", + false); + } + + if (findResponseWith.getValue().docs.empty()) { + return Status( + ErrorCodes::IncompatibleShardingMetadata, + str::stream() + << "Could not find any chunks for collection '" + << nss.ns() + << "'. The collection has been dropped since the migration began. Aborting" + " migration commit for chunk (" + << redact(aChunk.getRange().toString()) + << ")."); + } + + auto chunkWith = ChunkType::fromBSON(findResponseWith.getValue().docs.front()); + if (!chunkWith.isOK()) { + return chunkWith.getStatus(); + } else if (chunkWith.getValue().getVersion().epoch() != collectionEpoch) { + return Status(ErrorCodes::StaleEpoch, + str::stream() << "The collection '" << nss.ns() + << "' has been dropped and recreated since the migration began." + " The config server's collection version epoch is now '" + << chunkWith.getValue().getVersion().epoch().toString() + << "', but the shard's is " + << collectionEpoch.toString() + << "'. Aborting migration commit for chunk (" + << redact(aChunk.getRange().toString()) + << ")."); + } + return Status::OK(); +} + +static Status checkChunkIsOnShard(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& min, + const BSONObj& max, + const ShardId& shard) { + BSONObj chunkQuery = + BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max + << ChunkType::shard() + << shard); + // Must use local read concern because we're going to perform subsequent writes. + auto findResponseWith = + Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + chunkQuery, + BSONObj(), + 1); + if (!findResponseWith.isOK()) { + return findResponseWith.getStatus(); + } + if (findResponseWith.getValue().docs.empty()) { + return Status( + ErrorCodes::Error(40165), + str::stream() + << "Could not find the chunk (" + << chunkQuery.toString() + << ") on the shard. Cannot execute the migration commit with invalid chunks."); + } + return Status::OK(); +} + +static BSONObj makeCommitChunkApplyOpsCommand(const NamespaceString& nss, + const ChunkType& migratedChunk, + const boost::optional<ChunkType>& controlChunk, + StringData fromShard, + StringData toShard) { + + // Update migratedChunk's version and shard. + BSONArrayBuilder updates; + { + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); // No upserting + op.append("ns", ChunkType::ConfigNS); + + BSONObjBuilder n(op.subobjStart("o")); + n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin())); + migratedChunk.getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod()); + n.append(ChunkType::ns(), nss.ns()); + n.append(ChunkType::min(), migratedChunk.getMin()); + n.append(ChunkType::max(), migratedChunk.getMax()); + n.append(ChunkType::shard(), toShard); + n.done(); + + BSONObjBuilder q(op.subobjStart("o2")); + q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin())); + q.done(); + + updates.append(op.obj()); + } + + // If we have a controlChunk, update its chunk version. + if (controlChunk) { + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); + op.append("ns", ChunkType::ConfigNS); + + BSONObjBuilder n(op.subobjStart("o")); + n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin())); + controlChunk->getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod()); + n.append(ChunkType::ns(), nss.ns()); + n.append(ChunkType::min(), controlChunk->getMin()); + n.append(ChunkType::max(), controlChunk->getMax()); + n.append(ChunkType::shard(), fromShard); + n.done(); + + BSONObjBuilder q(op.subobjStart("o2")); + q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin())); + q.done(); + + updates.append(op.obj()); + } + + // Do not give applyOps a write concern. If applyOps tries to wait for replication, it will + // fail because of the GlobalWrite lock CommitChunkMigration already holds. Replication will + // not be able to take the lock it requires. + return BSON("applyOps" << updates.arr()); +} + +} // namespace + +StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration( + OperationContext* txn, + const NamespaceString& nss, + const ChunkType& migratedChunk, + const boost::optional<ChunkType>& controlChunk, + const OID& collectionEpoch, + const ShardId& fromShard, + const ShardId& toShard) { + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations. + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel. + // (Note: This is not needed while we have a global lock, taken here only for consistency.) + Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); + + // Acquire GlobalLock in MODE_X twice to prevent yielding. + // Run operations under a nested lock as a hack to prevent yielding. When query/applyOps + // commands are called, they will take a second lock, and the PlanExecutor will be unable to + // yield. + // + // ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions + // for migrated chunks are generated within the command. Therefore it cannot be allowed to + // yield between generating the ChunkVersion and committing it to the database with + // applyOps. + + Lock::GlobalWrite firstGlobalWriteLock(txn->lockState()); + + // Ensure that the epoch passed in still matches the real state of the database. + + auto epochCheck = checkCollectionVersionEpoch(txn, nss, migratedChunk, collectionEpoch); + if (!epochCheck.isOK()) { + return epochCheck; + } + + // Check that migratedChunk and controlChunk are where they should be, on fromShard. + + auto migratedOnShard = + checkChunkIsOnShard(txn, nss, migratedChunk.getMin(), migratedChunk.getMax(), fromShard); + if (!migratedOnShard.isOK()) { + return migratedOnShard; + } + + if (controlChunk) { + auto controlOnShard = checkChunkIsOnShard( + txn, nss, controlChunk->getMin(), controlChunk->getMax(), fromShard); + if (!controlOnShard.isOK()) { + return controlOnShard; + } + } + + // Must use local read concern because we will perform subsequent writes. + auto findResponse = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON("ns" << nss.ns()), + BSON(ChunkType::DEPRECATED_lastmod << -1), + 1); + if (!findResponse.isOK()) { + return findResponse.getStatus(); + } + + std::vector<BSONObj> chunksVector = std::move(findResponse.getValue().docs); + if (chunksVector.empty()) { + return Status(ErrorCodes::Error(40164), + str::stream() << "Tried to find max chunk version for collection '" + << nss.ns() + << ", but found no chunks"); + } + + ChunkVersion currentMaxVersion = + ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); + + // Use the incremented major version of the result returned. + + // Generate the new versions of migratedChunk and controlChunk. + // Migrating chunk's minor version will be 0. + ChunkType newMigratedChunk = migratedChunk; + newMigratedChunk.setVersion( + ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch())); + + // Control chunk's minor version will be 1 (if control chunk is present). + boost::optional<ChunkType> newControlChunk = boost::none; + if (controlChunk) { + newControlChunk = controlChunk.get(); + newControlChunk->setVersion( + ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch())); + } + + auto command = makeCommitChunkApplyOpsCommand( + nss, newMigratedChunk, newControlChunk, fromShard.toString(), toShard.toString()); + + StatusWith<Shard::CommandResponse> applyOpsCommandResponse = + Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + nss.db().toString(), + command, + Shard::RetryPolicy::kIdempotent); + + if (!applyOpsCommandResponse.isOK()) { + return applyOpsCommandResponse.getStatus(); + } + if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) { + return applyOpsCommandResponse.getValue().commandStatus; + } + + BSONObjBuilder result; + newMigratedChunk.getVersion().appendWithFieldForCommands(&result, "migratedChunkVersion"); + if (controlChunk) { + newControlChunk->getVersion().appendWithFieldForCommands(&result, "controlChunkVersion"); + } + return result.obj(); +} + void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { _executorForAddShard->appendConnectionStats(stats); } diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/sharding_catalog_manager_impl.h index 5294e31ace7..744bc30b31f 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.h @@ -43,6 +43,9 @@ class DatabaseType; class RemoteCommandTargeter; class ShardingCatalogClient; class VersionType; +class ShardId; +template <typename T> +class StatusWith; namespace executor { class TaskExecutor; @@ -100,6 +103,14 @@ public: const std::vector<BSONObj>& chunkBoundaries, const std::string& shardName) override; + StatusWith<BSONObj> commitChunkMigration(OperationContext* txn, + const NamespaceString& nss, + const ChunkType& migratedChunk, + const boost::optional<ChunkType>& controlChunk, + const OID& collectionEpoch, + const ShardId& fromShard, + const ShardId& toShard) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp index cc53dfbada1..44a0e884bb8 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp @@ -98,6 +98,17 @@ Status ShardingCatalogManagerMock::commitChunkMerge(OperationContext* txn, return {ErrorCodes::InternalError, "Method not implemented"}; } +StatusWith<BSONObj> ShardingCatalogManagerMock::commitChunkMigration( + OperationContext* txn, + const NamespaceString&, + const ChunkType&, + const boost::optional<ChunkType>&, + const OID& collectionEpoch, + const ShardId&, + const ShardId&) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {} Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationContext* txn) { diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h index 23ab27831da..9a970c3a7be 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h @@ -28,6 +28,8 @@ #pragma once +#include <boost/optional.hpp> +#include "mongo/base/status_with.h" #include "mongo/bson/bsonobj.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/s/catalog/sharding_catalog_manager.h" @@ -82,6 +84,14 @@ public: const std::vector<BSONObj>& chunkBoundaries, const std::string& shardName) override; + StatusWith<BSONObj> commitChunkMigration(OperationContext* txn, + const NamespaceString& nss, + const ChunkType& migratedChunk, + const boost::optional<ChunkType>& controlChunk, + const OID& collectionEpoch, + const ShardId& fromShard, + const ShardId& toShard) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h index a7a0e1f7e30..de468e21f79 100644 --- a/src/mongo/s/catalog/type_chunk.h +++ b/src/mongo/s/catalog/type_chunk.h @@ -151,6 +151,10 @@ public: } void setMax(const BSONObj& max); + ChunkRange getRange() const { + return ChunkRange(getMin(), getMax()); + } + bool isVersionSet() const { return _version.is_initialized(); } diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp index f0563fa76e1..d9201693f2a 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp +++ b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp @@ -55,23 +55,23 @@ const char kConfigSvrCommitChunkMigration[] = "_configsvrCommitChunkMigration"; TEST(CommitChunkMigrationRequest, WithControlChunk) { BSONObjBuilder builder; - ChunkType migratedChunkType; - migratedChunkType.setMin(kKey0); - migratedChunkType.setMax(kKey1); + ChunkVersion fromShardCollectionVersion(1, 2, OID::gen()); - ChunkType controlChunkTypeTemp; - controlChunkTypeTemp.setMin(kKey2); - controlChunkTypeTemp.setMax(kKey3); - boost::optional<ChunkType> controlChunkType = std::move(controlChunkTypeTemp); + ChunkType migratedChunk; + migratedChunk.setMin(kKey0); + migratedChunk.setMax(kKey1); - ChunkVersion fromShardCollectionVersion(1, 2, OID::gen()); + ChunkType controlChunk; + controlChunk.setMin(kKey2); + controlChunk.setMax(kKey3); + boost::optional<ChunkType> controlChunkOpt = controlChunk; CommitChunkMigrationRequest::appendAsCommand(&builder, kNamespaceString, kShardId0, kShardId1, - migratedChunkType, - controlChunkType, + migratedChunk, + controlChunkOpt, fromShardCollectionVersion, kShardHasDistributedLock); @@ -83,21 +83,21 @@ TEST(CommitChunkMigrationRequest, WithControlChunk) { ASSERT_EQ(kNamespaceString, request.getNss()); ASSERT_EQ(kShardId0, request.getFromShard()); ASSERT_EQ(kShardId1, request.getToShard()); - ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunkRange().getMin()); - ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunkRange().getMax()); - ASSERT(request.hasControlChunkRange()); - ASSERT_BSONOBJ_EQ(kKey2, request.getControlChunkRange().getMin()); - ASSERT_BSONOBJ_EQ(kKey3, request.getControlChunkRange().getMax()); - ASSERT_EQ(fromShardCollectionVersion, request.getFromShardCollectionVersion()); + ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunk().getMin()); + ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunk().getMax()); + ASSERT(request.getControlChunk()); + ASSERT_BSONOBJ_EQ(kKey2, request.getControlChunk()->getMin()); + ASSERT_BSONOBJ_EQ(kKey3, request.getControlChunk()->getMax()); + ASSERT_EQ(fromShardCollectionVersion.epoch(), request.getCollectionEpoch()); ASSERT_EQ(kShardHasDistributedLock, request.shardHasDistributedLock()); } TEST(CommitChunkMigrationRequest, WithoutControlChunk) { BSONObjBuilder builder; - ChunkType migratedChunkType; - migratedChunkType.setMin(kKey0); - migratedChunkType.setMax(kKey1); + ChunkType migratedChunk; + migratedChunk.setMin(kKey0); + migratedChunk.setMax(kKey1); ChunkVersion fromShardCollectionVersion(1, 2, OID::gen()); @@ -105,7 +105,7 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) { kNamespaceString, kShardId0, kShardId1, - migratedChunkType, + migratedChunk, boost::none, fromShardCollectionVersion, kShardHasDistributedLock); @@ -118,10 +118,10 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) { ASSERT_EQ(kNamespaceString, request.getNss()); ASSERT_EQ(kShardId0, request.getFromShard()); ASSERT_EQ(kShardId1, request.getToShard()); - ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunkRange().getMin()); - ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunkRange().getMax()); - ASSERT(!request.hasControlChunkRange()); - ASSERT_EQ(fromShardCollectionVersion, request.getFromShardCollectionVersion()); + ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunk().getMin()); + ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunk().getMax()); + ASSERT(!request.getControlChunk()); + ASSERT_EQ(fromShardCollectionVersion.epoch(), request.getCollectionEpoch()); ASSERT_EQ(kShardHasDistributedLock, request.shardHasDistributedLock()); } diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp index 94cfd070e7c..4b9fbfd149d 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp +++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp @@ -44,15 +44,22 @@ const char kFromShardCollectionVersion[] = "fromShardCollectionVersion"; const char kShardHasDistributedLock[] = "shardHasDistributedLock"; /** - * Attempts to parse a ChunkRange from "field" in "source". + * Attempts to parse a (range-only!) ChunkType from "field" in "source". */ -StatusWith<ChunkRange> extractChunkRange(const BSONObj& source, StringData field) { +StatusWith<ChunkType> extractChunk(const BSONObj& source, StringData field) { BSONElement fieldElement; auto status = bsonExtractTypedField(source, field, BSONType::Object, &fieldElement); if (!status.isOK()) return status; - return ChunkRange::fromBSON(fieldElement.Obj()); + auto rangeWith = ChunkRange::fromBSON(fieldElement.Obj()); + if (!rangeWith.isOK()) + return rangeWith.getStatus(); + + ChunkType chunk; + chunk.setMin(rangeWith.getValue().getMin()); + chunk.setMax(rangeWith.getValue().getMax()); + return chunk; } /** @@ -79,12 +86,12 @@ StatusWith<ShardId> extractShardId(const BSONObj& source, StringData field) { StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromCommand( const NamespaceString& nss, const BSONObj& obj) { - auto migratedChunkRange = extractChunkRange(obj, kMigratedChunk); - if (!migratedChunkRange.isOK()) { - return migratedChunkRange.getStatus(); + auto migratedChunk = extractChunk(obj, kMigratedChunk); + if (!migratedChunk.isOK()) { + return migratedChunk.getStatus(); } - CommitChunkMigrationRequest request(nss, std::move(migratedChunkRange.getValue())); + CommitChunkMigrationRequest request(nss, std::move(migratedChunk.getValue())); { auto fromShard = extractShardId(obj, kFromShard); @@ -107,23 +114,23 @@ StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromC { // controlChunk is optional, so parse it if present. if (obj.hasField(kControlChunk)) { - auto controlChunkRange = extractChunkRange(obj, kControlChunk); - if (!controlChunkRange.isOK()) { - return controlChunkRange.getStatus(); + auto controlChunk = extractChunk(obj, kControlChunk); + if (!controlChunk.isOK()) { + return controlChunk.getStatus(); } - request._controlChunkRange = std::move(controlChunkRange.getValue()); + request._controlChunk = std::move(controlChunk.getValue()); } } { auto statusWithChunkVersion = ChunkVersion::parseFromBSONWithFieldForCommands(obj, kFromShardCollectionVersion); - if (statusWithChunkVersion.isOK()) { - request._fromShardCollectionVersion = std::move(statusWithChunkVersion.getValue()); - } else if (statusWithChunkVersion != ErrorCodes::NoSuchKey) { + if (!statusWithChunkVersion.isOK()) { return statusWithChunkVersion.getStatus(); } + + request._collectionEpoch = statusWithChunkVersion.getValue().epoch(); } { @@ -137,37 +144,27 @@ StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromC return request; } -void CommitChunkMigrationRequest::appendAsCommand( - BSONObjBuilder* builder, - const NamespaceString& nss, - const ShardId& fromShard, - const ShardId& toShard, - const ChunkType& migratedChunkType, - const boost::optional<ChunkType>& controlChunkType, - const ChunkVersion& fromShardCollectionVersion, - const bool& shardHasDistributedLock) { +void CommitChunkMigrationRequest::appendAsCommand(BSONObjBuilder* builder, + const NamespaceString& nss, + const ShardId& fromShard, + const ShardId& toShard, + const ChunkType& migratedChunk, + const boost::optional<ChunkType>& controlChunk, + const ChunkVersion& fromShardCollectionVersion, + const bool& shardHasDistributedLock) { invariant(builder->asTempObj().isEmpty()); invariant(nss.isValid()); builder->append(kConfigSvrCommitChunkMigration, nss.ns()); builder->append(kFromShard, fromShard.toString()); builder->append(kToShard, toShard.toString()); - builder->append(kMigratedChunk, migratedChunkType.toBSON()); + builder->append(kMigratedChunk, migratedChunk.toBSON()); fromShardCollectionVersion.appendWithFieldForCommands(builder, kFromShardCollectionVersion); builder->append(kShardHasDistributedLock, shardHasDistributedLock); - if (controlChunkType) { - builder->append(kControlChunk, controlChunkType->toBSON()); + if (controlChunk) { + builder->append(kControlChunk, controlChunk->toBSON()); } } -const ChunkRange& CommitChunkMigrationRequest::getControlChunkRange() const { - invariant(_controlChunkRange); - return _controlChunkRange.get(); -} - -CommitChunkMigrationRequest::CommitChunkMigrationRequest(const NamespaceString& nss, - const ChunkRange& range) - : _nss(nss), _migratedChunkRange(range) {} - } // namespace mongo diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.h b/src/mongo/s/request_types/commit_chunk_migration_request_type.h index 461278e429e..c1f97f5c7fc 100644 --- a/src/mongo/s/request_types/commit_chunk_migration_request_type.h +++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.h @@ -38,8 +38,11 @@ namespace mongo { /** * Creates and parses commit chunk migration command BSON objects. */ -class CommitChunkMigrationRequest { -public: +struct CommitChunkMigrationRequest { + + CommitChunkMigrationRequest(const NamespaceString& nss, const ChunkType& chunk) + : _nss(nss), _migratedChunk(chunk), _shardHasDistributedLock() {} + /** * Parses the input command and produces a request corresponding to its arguments. */ @@ -63,35 +66,26 @@ public: const NamespaceString& getNss() const { return _nss; } - const ShardId& getFromShard() const { return _fromShard; } - const ShardId& getToShard() const { return _toShard; } - - const ChunkRange& getMigratedChunkRange() const { - return _migratedChunkRange; - } - - const ChunkRange& getControlChunkRange() const; - - bool hasControlChunkRange() { - return bool(_controlChunkRange); + const ChunkType& getMigratedChunk() const { + return _migratedChunk; } - - const ChunkVersion& getFromShardCollectionVersion() const { - return _fromShardCollectionVersion; + const boost::optional<ChunkType>& getControlChunk() const { + return _controlChunk; } bool shardHasDistributedLock() { return _shardHasDistributedLock; } -private: - CommitChunkMigrationRequest(const NamespaceString& nss, const ChunkRange& range); + const OID& getCollectionEpoch() { + return _collectionEpoch; + } // The collection for which this request applies. NamespaceString _nss; @@ -102,17 +96,16 @@ private: // The recipient shard name. ShardId _toShard; - // Range of migrated chunk being moved. - ChunkRange _migratedChunkRange; - - // Range of control chunk being moved, if it exists. - boost::optional<ChunkRange> _controlChunkRange; + // The chunk being moved. + ChunkType _migratedChunk; - // Collection version of the source shard. - ChunkVersion _fromShardCollectionVersion; + // A chunk on the shard moved from, if any remain. + boost::optional<ChunkType> _controlChunk; // Flag to indicate whether the shard has the distlock. bool _shardHasDistributedLock; + + OID _collectionEpoch; }; } // namespace mongo |