diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-01-17 16:59:35 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-01-18 14:48:14 -0500 |
commit | 0fe3516e0db2fdc6b3602b0018ebbd9f62008bab (patch) | |
tree | f12d49b5645d5144f03a572118cca272d8cd86ab | |
parent | c43d7cd31af6854eaf134f8021d9f0784ddf5070 (diff) | |
download | mongo-0fe3516e0db2fdc6b3602b0018ebbd9f62008bab.tar.gz |
SERVER-27726 Split ShardingCatalogManagerImpl into multiple .cpp files
Gets rid of the ShardingCatalogManagerMock. No functional changes.
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager.h | 3 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp | 696 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_impl.cpp | 2011 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_impl.h | 80 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_mock.cpp | 144 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_mock.h | 115 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_shard_operations_impl.cpp | 1063 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_zone_operations_impl.cpp | 397 | ||||
-rw-r--r-- | src/mongo/s/config_server_test_fixture.cpp | 2 |
11 files changed, 2197 insertions, 2321 deletions
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index ebfed8383d1..efc67055f80 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -89,8 +89,7 @@ Status initializeGlobalShardingStateForMongod(OperationContext* txn, [](ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> executor) -> std::unique_ptr<ShardingCatalogManager> { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - return stdx::make_unique<ShardingCatalogManagerImpl>(catalogClient, - std::move(executor)); + return stdx::make_unique<ShardingCatalogManagerImpl>(std::move(executor)); } else { return nullptr; // Only config servers get a real ShardingCatalogManager } diff --git a/src/mongo/s/catalog/SConscript b/src/mongo/s/catalog/SConscript index c09305a2f7d..e06a518b2dc 100644 --- a/src/mongo/s/catalog/SConscript +++ b/src/mongo/s/catalog/SConscript @@ -18,7 +18,6 @@ env.Library( target='sharding_catalog_mock', source=[ 'sharding_catalog_client_mock.cpp', - 'sharding_catalog_manager_mock.cpp', ], LIBDEPS=[ 'dist_lock_manager_mock', @@ -143,7 +142,10 @@ env.Library( env.Library( target='sharding_catalog_manager_impl', source=[ + 'sharding_catalog_manager_chunk_operations_impl.cpp', 'sharding_catalog_manager_impl.cpp', + 'sharding_catalog_manager_shard_operations_impl.cpp', + 'sharding_catalog_manager_zone_operations_impl.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/db_raii', diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index da7709369b9..bb7946de096 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -25,9 +25,11 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + #pragma once #include <string> +#include <vector> #include "mongo/base/disallow_copying.h" #include "mongo/stdx/memory.h" @@ -39,7 +41,6 @@ class ChunkRange; class ConnectionString; class NamespaceString; class OperationContext; -class RemoteCommandTargeter; class ShardId; class ShardType; class ChunkType; diff --git a/src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp new file mode 100644 index 00000000000..e6b7bd78f08 --- /dev/null +++ b/src/mongo/s/catalog/sharding_catalog_manager_chunk_operations_impl.cpp @@ -0,0 +1,696 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/sharding_catalog_manager_impl.h" + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/connection_string.h" +#include "mongo/client/read_preference.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_key_pattern.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace { + +MONGO_FP_DECLARE(migrationCommitVersionError); + +/** + * Append min, max and version information from chunk to the buffer for logChange purposes. + */ +void appendShortVersion(BufBuilder* b, const ChunkType& chunk) { + BSONObjBuilder bb(*b); + bb.append(ChunkType::min(), chunk.getMin()); + bb.append(ChunkType::max(), chunk.getMax()); + if (chunk.isVersionSet()) + chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod()); + bb.done(); +} + +BSONArray buildMergeChunksApplyOpsUpdates(const std::vector<ChunkType>& chunksToMerge, + const ChunkVersion& mergeVersion) { + BSONArrayBuilder updates; + + // Build an update operation to expand the first chunk into the newly merged chunk + { + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); // no upsert + op.append("ns", ChunkType::ConfigNS); + + // expand first chunk into newly merged chunk + ChunkType mergedChunk(chunksToMerge.front()); + mergedChunk.setMax(chunksToMerge.back().getMax()); + + // fill in additional details for sending through applyOps + mergedChunk.setVersion(mergeVersion); + + // add the new chunk information as the update object + op.append("o", mergedChunk.toBSON()); + + // query object + op.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); + + updates.append(op.obj()); + } + + // Build update operations to delete the rest of the chunks to be merged. Remember not + // to delete the first chunk we're expanding + for (size_t i = 1; i < chunksToMerge.size(); ++i) { + BSONObjBuilder op; + op.append("op", "d"); + op.append("ns", ChunkType::ConfigNS); + + op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName()))); + + updates.append(op.obj()); + } + + return updates.arr(); +} + +BSONArray buildMergeChunksApplyOpsPrecond(const std::vector<ChunkType>& chunksToMerge, + const ChunkVersion& collVersion) { + BSONArrayBuilder preCond; + + for (auto chunk : chunksToMerge) { + BSONObjBuilder b; + b.append("ns", ChunkType::ConfigNS); + b.append( + "q", + BSON("query" << BSON(ChunkType::ns(chunk.getNS()) << ChunkType::min(chunk.getMin()) + << ChunkType::max(chunk.getMax())) + << "orderby" + << BSON(ChunkType::DEPRECATED_lastmod() << -1))); + b.append("res", + BSON(ChunkType::DEPRECATED_epoch(collVersion.epoch()) + << ChunkType::shard(chunk.getShard().toString()))); + preCond.append(b.obj()); + } + return preCond.arr(); +} + +/** + * Checks that the epoch in the version the shard sent with the command matches the epoch of the + * collection version found on the config server. It is possible for a migration to end up running + * partly without the protection of the distributed lock. This function checks that the collection + * has not been dropped and recreated since the migration began, unbeknown to the shard when the + * command was sent. + */ +Status checkCollectionVersionEpoch(OperationContext* txn, + const NamespaceString& nss, + const ChunkType& aChunk, + const OID& collectionEpoch) { + auto findResponseWith = + Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON(ChunkType::ns() << nss.ns()), + BSONObj(), + 1); + if (!findResponseWith.isOK()) { + return findResponseWith.getStatus(); + } + + if (MONGO_FAIL_POINT(migrationCommitVersionError)) { + uassert(ErrorCodes::StaleEpoch, + "failpoint 'migrationCommitVersionError' generated error", + false); + } + + if (findResponseWith.getValue().docs.empty()) { + return Status( + ErrorCodes::IncompatibleShardingMetadata, + str::stream() + << "Could not find any chunks for collection '" + << nss.ns() + << "'. The collection has been dropped since the migration began. Aborting" + " migration commit for chunk (" + << redact(aChunk.getRange().toString()) + << ")."); + } + + auto chunkWith = ChunkType::fromBSON(findResponseWith.getValue().docs.front()); + if (!chunkWith.isOK()) { + return chunkWith.getStatus(); + } else if (chunkWith.getValue().getVersion().epoch() != collectionEpoch) { + return Status(ErrorCodes::StaleEpoch, + str::stream() << "The collection '" << nss.ns() + << "' has been dropped and recreated since the migration began." + " The config server's collection version epoch is now '" + << chunkWith.getValue().getVersion().epoch().toString() + << "', but the shard's is " + << collectionEpoch.toString() + << "'. Aborting migration commit for chunk (" + << redact(aChunk.getRange().toString()) + << ")."); + } + return Status::OK(); +} + +Status checkChunkIsOnShard(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& min, + const BSONObj& max, + const ShardId& shard) { + BSONObj chunkQuery = + BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max + << ChunkType::shard() + << shard); + + // Must use local read concern because we're going to perform subsequent writes. + auto findResponseWith = + Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + chunkQuery, + BSONObj(), + 1); + if (!findResponseWith.isOK()) { + return findResponseWith.getStatus(); + } + + if (findResponseWith.getValue().docs.empty()) { + return {ErrorCodes::Error(40165), + str::stream() + << "Could not find the chunk (" + << chunkQuery.toString() + << ") on the shard. Cannot execute the migration commit with invalid chunks."}; + } + + return Status::OK(); +} + +BSONObj makeCommitChunkApplyOpsCommand(const NamespaceString& nss, + const ChunkType& migratedChunk, + const boost::optional<ChunkType>& controlChunk, + StringData fromShard, + StringData toShard) { + + // Update migratedChunk's version and shard. + BSONArrayBuilder updates; + { + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); // No upserting + op.append("ns", ChunkType::ConfigNS); + + BSONObjBuilder n(op.subobjStart("o")); + n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin())); + migratedChunk.getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod()); + n.append(ChunkType::ns(), nss.ns()); + n.append(ChunkType::min(), migratedChunk.getMin()); + n.append(ChunkType::max(), migratedChunk.getMax()); + n.append(ChunkType::shard(), toShard); + n.done(); + + BSONObjBuilder q(op.subobjStart("o2")); + q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin())); + q.done(); + + updates.append(op.obj()); + } + + // If we have a controlChunk, update its chunk version. + if (controlChunk) { + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); + op.append("ns", ChunkType::ConfigNS); + + BSONObjBuilder n(op.subobjStart("o")); + n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin())); + controlChunk->getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod()); + n.append(ChunkType::ns(), nss.ns()); + n.append(ChunkType::min(), controlChunk->getMin()); + n.append(ChunkType::max(), controlChunk->getMax()); + n.append(ChunkType::shard(), fromShard); + n.done(); + + BSONObjBuilder q(op.subobjStart("o2")); + q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin())); + q.done(); + + updates.append(op.obj()); + } + + // Do not give applyOps a write concern. If applyOps tries to wait for replication, it will fail + // because of the GlobalWrite lock CommitChunkMigration already holds. Replication will not be + // able to take the lock it requires. + return BSON("applyOps" << updates.arr()); +} + +} // namespace + +Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const ChunkRange& range, + const std::vector<BSONObj>& splitPoints, + const std::string& shardName) { + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel + Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); + + // Get the chunk with highest version for this namespace + auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON("ns" << ns.ns()), + BSON(ChunkType::DEPRECATED_lastmod << -1), + 1); + + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& chunksVector = findStatus.getValue().docs; + if (chunksVector.empty()) + return {ErrorCodes::IllegalOperation, + "collection does not exist, isn't sharded, or has no chunks"}; + + ChunkVersion collVersion = + ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); + + // Return an error if epoch of chunk does not match epoch of request + if (collVersion.epoch() != requestEpoch) { + return {ErrorCodes::StaleEpoch, + "epoch of chunk does not match epoch of request. This most likely means " + "that the collection was dropped and re-created."}; + } + + std::vector<ChunkType> newChunks; + + ChunkVersion currentMaxVersion = collVersion; + + auto startKey = range.getMin(); + auto newChunkBounds(splitPoints); + newChunkBounds.push_back(range.getMax()); + + BSONArrayBuilder updates; + + for (const auto& endKey : newChunkBounds) { + // Verify the split points are all within the chunk + if (endKey.woCompare(range.getMax()) != 0 && !range.containsKey(endKey)) { + return {ErrorCodes::InvalidOptions, + str::stream() << "Split key " << endKey << " not contained within chunk " + << range.toString()}; + } + + // Verify the split points came in increasing order + if (endKey.woCompare(startKey) < 0) { + return { + ErrorCodes::InvalidOptions, + str::stream() << "Split keys must be specified in strictly increasing order. Key " + << endKey + << " was specified after " + << startKey + << "."}; + } + + // Verify that splitPoints are not repeated + if (endKey.woCompare(startKey) == 0) { + return {ErrorCodes::InvalidOptions, + str::stream() << "Split on lower bound of chunk " + << ChunkRange(startKey, endKey).toString() + << "is not allowed"}; + } + + // verify that splits don't create too-big shard keys + Status shardKeyStatus = ShardKeyPattern::checkShardKeySize(endKey); + if (!shardKeyStatus.isOK()) { + return shardKeyStatus; + } + + // splits only update the 'minor' portion of version + currentMaxVersion.incMinor(); + + // build an update operation against the chunks collection of the config database + // with upsert true + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", true); + op.append("ns", ChunkType::ConfigNS); + + // add the modified (new) chunk information as the update object + BSONObjBuilder n(op.subobjStart("o")); + n.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey)); + currentMaxVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); + n.append(ChunkType::ns(), ns.ns()); + n.append(ChunkType::min(), startKey); + n.append(ChunkType::max(), endKey); + n.append(ChunkType::shard(), shardName); + n.done(); + + // add the chunk's _id as the query part of the update statement + BSONObjBuilder q(op.subobjStart("o2")); + q.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey)); + q.done(); + + updates.append(op.obj()); + + // remember this chunk info for logging later + ChunkType chunk; + chunk.setMin(startKey); + chunk.setMax(endKey); + chunk.setVersion(currentMaxVersion); + + newChunks.push_back(std::move(chunk)); + + startKey = endKey; + } + + BSONArrayBuilder preCond; + { + BSONObjBuilder b; + b.append("ns", ChunkType::ConfigNS); + b.append("q", + BSON("query" << BSON(ChunkType::ns(ns.ns()) << ChunkType::min() << range.getMin() + << ChunkType::max() + << range.getMax()) + << "orderby" + << BSON(ChunkType::DEPRECATED_lastmod() << -1))); + { + BSONObjBuilder bb(b.subobjStart("res")); + bb.append(ChunkType::DEPRECATED_epoch(), requestEpoch); + bb.append(ChunkType::shard(), shardName); + } + preCond.append(b.obj()); + } + + // apply the batch of updates to remote and local metadata + Status applyOpsStatus = Grid::get(txn)->catalogClient(txn)->applyChunkOpsDeprecated( + txn, + updates.arr(), + preCond.arr(), + ns.ns(), + currentMaxVersion, + WriteConcernOptions(), + repl::ReadConcernLevel::kLocalReadConcern); + if (!applyOpsStatus.isOK()) { + return applyOpsStatus; + } + + // log changes + BSONObjBuilder logDetail; + { + BSONObjBuilder b(logDetail.subobjStart("before")); + b.append(ChunkType::min(), range.getMin()); + b.append(ChunkType::max(), range.getMax()); + collVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod()); + } + + if (newChunks.size() == 2) { + appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]); + appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]); + + Grid::get(txn)->catalogClient(txn)->logChange( + txn, "split", ns.ns(), logDetail.obj(), WriteConcernOptions()); + } else { + BSONObj beforeDetailObj = logDetail.obj(); + BSONObj firstDetailObj = beforeDetailObj.getOwned(); + const int newChunksSize = newChunks.size(); + + for (int i = 0; i < newChunksSize; i++) { + BSONObjBuilder chunkDetail; + chunkDetail.appendElements(beforeDetailObj); + chunkDetail.append("number", i + 1); + chunkDetail.append("of", newChunksSize); + appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]); + + Grid::get(txn)->catalogClient(txn)->logChange( + txn, "multi-split", ns.ns(), chunkDetail.obj(), WriteConcernOptions()); + } + } + + return applyOpsStatus; +} + +Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const std::vector<BSONObj>& chunkBoundaries, + const std::string& shardName) { + // This method must never be called with empty chunks to merge + invariant(!chunkBoundaries.empty()); + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel + Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); + + // Get the chunk with the highest version for this namespace + auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON("ns" << ns.ns()), + BSON(ChunkType::DEPRECATED_lastmod << -1), + 1); + + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& chunksVector = findStatus.getValue().docs; + if (chunksVector.empty()) + return {ErrorCodes::IllegalOperation, + "collection does not exist, isn't sharded, or has no chunks"}; + + ChunkVersion collVersion = + ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); + + // Return an error if epoch of chunk does not match epoch of request + if (collVersion.epoch() != requestEpoch) { + return {ErrorCodes::StaleEpoch, + "epoch of chunk does not match epoch of request. This most likely means " + "that the collection was dropped and re-created."}; + } + + // Build chunks to be merged + std::vector<ChunkType> chunksToMerge; + + ChunkType itChunk; + itChunk.setMax(chunkBoundaries.front()); + itChunk.setNS(ns.ns()); + itChunk.setShard(shardName); + + // Do not use the first chunk boundary as a max bound while building chunks + for (size_t i = 1; i < chunkBoundaries.size(); ++i) { + itChunk.setMin(itChunk.getMax()); + + // Ensure the chunk boundaries are strictly increasing + if (chunkBoundaries[i].woCompare(itChunk.getMin()) <= 0) { + return { + ErrorCodes::InvalidOptions, + str::stream() + << "Chunk boundaries must be specified in strictly increasing order. Boundary " + << chunkBoundaries[i] + << " was specified after " + << itChunk.getMin() + << "."}; + } + + itChunk.setMax(chunkBoundaries[i]); + chunksToMerge.push_back(itChunk); + } + + ChunkVersion mergeVersion = collVersion; + mergeVersion.incMinor(); + + auto updates = buildMergeChunksApplyOpsUpdates(chunksToMerge, mergeVersion); + auto preCond = buildMergeChunksApplyOpsPrecond(chunksToMerge, collVersion); + + // apply the batch of updates to remote and local metadata + Status applyOpsStatus = Grid::get(txn)->catalogClient(txn)->applyChunkOpsDeprecated( + txn, + updates, + preCond, + ns.ns(), + mergeVersion, + WriteConcernOptions(), + repl::ReadConcernLevel::kLocalReadConcern); + if (!applyOpsStatus.isOK()) { + return applyOpsStatus; + } + + // log changes + BSONObjBuilder logDetail; + { + BSONArrayBuilder b(logDetail.subarrayStart("merged")); + for (auto chunkToMerge : chunksToMerge) { + b.append(chunkToMerge.toBSON()); + } + } + collVersion.addToBSON(logDetail, "prevShardVersion"); + mergeVersion.addToBSON(logDetail, "mergedVersion"); + + Grid::get(txn)->catalogClient(txn)->logChange( + txn, "merge", ns.ns(), logDetail.obj(), WriteConcernOptions()); + + return applyOpsStatus; +} + +StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration( + OperationContext* txn, + const NamespaceString& nss, + const ChunkType& migratedChunk, + const boost::optional<ChunkType>& controlChunk, + const OID& collectionEpoch, + const ShardId& fromShard, + const ShardId& toShard) { + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations. + // + // ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions + // for migrated chunks are generated within the command and must be committed to the database + // before another chunk commit generates new ChunkVersions in the same manner. + // + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel. + // (Note: This is not needed while we have a global lock, taken here only for consistency.) + Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); + + // Ensure that the epoch passed in still matches the real state of the database. + + auto epochCheck = checkCollectionVersionEpoch(txn, nss, migratedChunk, collectionEpoch); + if (!epochCheck.isOK()) { + return epochCheck; + } + + // Check that migratedChunk and controlChunk are where they should be, on fromShard. + + auto migratedOnShard = + checkChunkIsOnShard(txn, nss, migratedChunk.getMin(), migratedChunk.getMax(), fromShard); + if (!migratedOnShard.isOK()) { + return migratedOnShard; + } + + if (controlChunk) { + auto controlOnShard = checkChunkIsOnShard( + txn, nss, controlChunk->getMin(), controlChunk->getMax(), fromShard); + if (!controlOnShard.isOK()) { + return controlOnShard; + } + } + + // Must use local read concern because we will perform subsequent writes. + auto findResponse = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON("ns" << nss.ns()), + BSON(ChunkType::DEPRECATED_lastmod << -1), + 1); + if (!findResponse.isOK()) { + return findResponse.getStatus(); + } + + std::vector<BSONObj> chunksVector = std::move(findResponse.getValue().docs); + if (chunksVector.empty()) { + return Status(ErrorCodes::Error(40164), + str::stream() << "Tried to find max chunk version for collection '" + << nss.ns() + << ", but found no chunks"); + } + + ChunkVersion currentMaxVersion = + ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); + + // Use the incremented major version of the result returned. + + // Generate the new versions of migratedChunk and controlChunk. + // Migrating chunk's minor version will be 0. + ChunkType newMigratedChunk = migratedChunk; + newMigratedChunk.setVersion( + ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch())); + + // Control chunk's minor version will be 1 (if control chunk is present). + boost::optional<ChunkType> newControlChunk = boost::none; + if (controlChunk) { + newControlChunk = controlChunk.get(); + newControlChunk->setVersion( + ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch())); + } + + auto command = makeCommitChunkApplyOpsCommand( + nss, newMigratedChunk, newControlChunk, fromShard.toString(), toShard.toString()); + + StatusWith<Shard::CommandResponse> applyOpsCommandResponse = + Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + nss.db().toString(), + command, + Shard::RetryPolicy::kIdempotent); + + if (!applyOpsCommandResponse.isOK()) { + return applyOpsCommandResponse.getStatus(); + } + if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) { + return applyOpsCommandResponse.getValue().commandStatus; + } + + BSONObjBuilder result; + newMigratedChunk.getVersion().appendWithFieldForCommands(&result, "migratedChunkVersion"); + if (controlChunk) { + newControlChunk->getVersion().appendWithFieldForCommands(&result, "controlChunkVersion"); + } + return result.obj(); +} + +} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp index b46a766d9e7..6c3e9ac749c 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp @@ -32,279 +32,33 @@ #include "mongo/s/catalog/sharding_catalog_manager_impl.h" -#include <iomanip> - -#include "mongo/base/error_codes.h" -#include "mongo/base/status.h" #include "mongo/base/status_with.h" -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/bson/util/bson_extract.h" -#include "mongo/client/connection_string.h" -#include "mongo/client/read_preference.h" -#include "mongo/client/remote_command_targeter.h" -#include "mongo/client/replica_set_monitor.h" -#include "mongo/db/client.h" -#include "mongo/db/commands.h" #include "mongo/db/commands/feature_compatibility_version.h" -#include "mongo/db/commands/feature_compatibility_version_command_parser.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/s/balancer/balancer_policy.h" #include "mongo/db/s/balancer/type_migration.h" -#include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/type_shard_identity.h" -#include "mongo/db/wire_version.h" -#include "mongo/executor/network_interface.h" -#include "mongo/executor/task_executor.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/sharding_catalog_client.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_config_version.h" -#include "mongo/s/catalog/type_database.h" #include "mongo/s/catalog/type_lockpings.h" #include "mongo/s/catalog/type_locks.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/catalog/type_tags.h" -#include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/cluster_identity_loader.h" #include "mongo/s/grid.h" -#include "mongo/s/set_shard_version_request.h" -#include "mongo/s/shard_key_pattern.h" -#include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/s/write_ops/batched_command_response.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" -#include "mongo/util/net/hostandport.h" -#include "mongo/util/scopeguard.h" namespace mongo { - -MONGO_FP_DECLARE(dontUpsertShardIdentityOnNewShards); -MONGO_FP_DECLARE(migrationCommitVersionError); - -using std::string; -using std::vector; -using str::stream; - namespace { -using CallbackHandle = executor::TaskExecutor::CallbackHandle; -using CallbackArgs = executor::TaskExecutor::CallbackArgs; -using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; -using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; - -const Seconds kDefaultFindHostMaxWaitTime(20); - -const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); -const ReadPreferenceSetting kConfigPrimarySelector(ReadPreference::PrimaryOnly); const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); -/** - * Append min, max and version information from chunk to the buffer for logChange purposes. -*/ -void _appendShortVersion(BufBuilder* b, const ChunkType& chunk) { - BSONObjBuilder bb(*b); - bb.append(ChunkType::min(), chunk.getMin()); - bb.append(ChunkType::max(), chunk.getMax()); - if (chunk.isVersionSet()) - chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod()); - bb.done(); -} - -/** - * Checks if the given key range for the given namespace conflicts with an existing key range. - * Note: range should have the full shard key. - * Returns ErrorCodes::RangeOverlapConflict is an overlap is detected. - */ -Status checkForOveralappedZonedKeyRange(OperationContext* txn, - Shard* configServer, - const NamespaceString& ns, - const ChunkRange& range, - const string& zoneName, - const KeyPattern& shardKeyPattern) { - DistributionStatus chunkDist(ns, ShardToChunksMap{}); - - auto tagStatus = configServer->exhaustiveFindOnConfig(txn, - kConfigPrimarySelector, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(TagsType::ConfigNS), - BSON(TagsType::ns(ns.ns())), - BSONObj(), - 0); - if (!tagStatus.isOK()) { - return tagStatus.getStatus(); - } - - const auto& tagDocList = tagStatus.getValue().docs; - for (const auto& tagDoc : tagDocList) { - auto tagParseStatus = TagsType::fromBSON(tagDoc); - if (!tagParseStatus.isOK()) { - return tagParseStatus.getStatus(); - } - - // Always extend ranges to full shard key to be compatible with tags created before - // the zone commands were implemented. - const auto& parsedTagDoc = tagParseStatus.getValue(); - auto overlapStatus = chunkDist.addRangeToZone( - ZoneRange(shardKeyPattern.extendRangeBound(parsedTagDoc.getMinKey(), false), - shardKeyPattern.extendRangeBound(parsedTagDoc.getMaxKey(), false), - parsedTagDoc.getTag())); - if (!overlapStatus.isOK()) { - return overlapStatus; - } - } - - auto overlapStatus = - chunkDist.addRangeToZone(ZoneRange(range.getMin(), range.getMax(), zoneName)); - if (!overlapStatus.isOK()) { - return overlapStatus; - } - - return Status::OK(); -} - -/** - * Returns a new range based on the given range with the full shard key. - * Returns: - * - ErrorCodes::NamespaceNotSharded if ns is not sharded. - * - ErrorCodes::ShardKeyNotFound if range is not compatible (for example, not a prefix of shard - * key) with the shard key of ns. - */ -StatusWith<ChunkRange> includeFullShardKey(OperationContext* txn, - Shard* configServer, - const NamespaceString& ns, - const ChunkRange& range, - KeyPattern* shardKeyPatternOut) { - auto findCollStatus = - configServer->exhaustiveFindOnConfig(txn, - kConfigPrimarySelector, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(CollectionType::ConfigNS), - BSON(CollectionType::fullNs(ns.ns())), - BSONObj(), - 1); - - if (!findCollStatus.isOK()) { - return findCollStatus.getStatus(); - } - - const auto& findCollResult = findCollStatus.getValue().docs; - - if (findCollResult.size() < 1) { - return {ErrorCodes::NamespaceNotSharded, str::stream() << ns.ns() << " is not sharded"}; - } - - auto parseStatus = CollectionType::fromBSON(findCollResult.front()); - if (!parseStatus.isOK()) { - return parseStatus.getStatus(); - } - - auto collDoc = parseStatus.getValue(); - if (collDoc.getDropped()) { - return {ErrorCodes::NamespaceNotSharded, str::stream() << ns.ns() << " is not sharded"}; - } - - const auto& shardKeyPattern = collDoc.getKeyPattern(); - const auto& shardKeyBSON = shardKeyPattern.toBSON(); - *shardKeyPatternOut = shardKeyPattern; - - if (!range.getMin().isFieldNamePrefixOf(shardKeyBSON)) { - return {ErrorCodes::ShardKeyNotFound, - str::stream() << "min: " << range.getMin() << " is not a prefix of the shard key " - << shardKeyBSON - << " of ns: " - << ns.ns()}; - } - - if (!range.getMax().isFieldNamePrefixOf(shardKeyBSON)) { - return {ErrorCodes::ShardKeyNotFound, - str::stream() << "max: " << range.getMax() << " is not a prefix of the shard key " - << shardKeyBSON - << " of ns: " - << ns.ns()}; - } - - return ChunkRange(shardKeyPattern.extendRangeBound(range.getMin(), false), - shardKeyPattern.extendRangeBound(range.getMax(), false)); -} - -BSONArray buildMergeChunksApplyOpsUpdates(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& mergeVersion) { - BSONArrayBuilder updates; - - // Build an update operation to expand the first chunk into the newly merged chunk - { - BSONObjBuilder op; - op.append("op", "u"); - op.appendBool("b", false); // no upsert - op.append("ns", ChunkType::ConfigNS); - - // expand first chunk into newly merged chunk - ChunkType mergedChunk(chunksToMerge.front()); - mergedChunk.setMax(chunksToMerge.back().getMax()); - - // fill in additional details for sending through applyOps - mergedChunk.setVersion(mergeVersion); - - // add the new chunk information as the update object - op.append("o", mergedChunk.toBSON()); - - // query object - op.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); - - updates.append(op.obj()); - } - - // Build update operations to delete the rest of the chunks to be merged. Remember not - // to delete the first chunk we're expanding - for (size_t i = 1; i < chunksToMerge.size(); ++i) { - BSONObjBuilder op; - op.append("op", "d"); - op.append("ns", ChunkType::ConfigNS); - - op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName()))); - - updates.append(op.obj()); - } - - return updates.arr(); -} - -BSONArray buildMergeChunksApplyOpsPrecond(const std::vector<ChunkType>& chunksToMerge, - const ChunkVersion& collVersion) { - BSONArrayBuilder preCond; - - for (auto chunk : chunksToMerge) { - BSONObjBuilder b; - b.append("ns", ChunkType::ConfigNS); - b.append( - "q", - BSON("query" << BSON(ChunkType::ns(chunk.getNS()) << ChunkType::min(chunk.getMin()) - << ChunkType::max(chunk.getMax())) - << "orderby" - << BSON(ChunkType::DEPRECATED_lastmod() << -1))); - b.append("res", - BSON(ChunkType::DEPRECATED_epoch(collVersion.epoch()) - << ChunkType::shard(chunk.getShard().toString()))); - preCond.append(b.obj()); - } - return preCond.arr(); -} - } // namespace ShardingCatalogManagerImpl::ShardingCatalogManagerImpl( - ShardingCatalogClient* catalogClient, std::unique_ptr<executor::TaskExecutor> addShardExecutor) - : _catalogClient(catalogClient), - _executorForAddShard(std::move(addShardExecutor)), + std::unique_ptr<executor::TaskExecutor> addShardExecutor) + : _executorForAddShard(std::move(addShardExecutor)), _kZoneOpLock("zoneOpLock"), _kChunkOpLock("chunkOpLock"), _kShardMembershipLock("shardMembershipLock") {} @@ -332,1404 +86,6 @@ void ShardingCatalogManagerImpl::shutDown(OperationContext* txn) { _executorForAddShard->join(); } -StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAddShard( - OperationContext* txn, - RemoteCommandTargeter* targeter, - const std::string& dbName, - const BSONObj& cmdObj) { - auto host = targeter->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); - if (!host.isOK()) { - return host.getStatus(); - } - - executor::RemoteCommandRequest request( - host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); - executor::RemoteCommandResponse swResponse = - Status(ErrorCodes::InternalError, "Internal error running command"); - - auto callStatus = _executorForAddShard->scheduleRemoteCommand( - request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - swResponse = args.response; - }); - if (!callStatus.isOK()) { - return callStatus.getStatus(); - } - - // Block until the command is carried out - _executorForAddShard->wait(callStatus.getValue()); - - if (!swResponse.isOK()) { - if (swResponse.status.compareCode(ErrorCodes::ExceededTimeLimit)) { - LOG(0) << "Operation for addShard timed out with status " << swResponse.status; - } - if (!Shard::shouldErrorBePropagated(swResponse.status.code())) { - swResponse.status = {ErrorCodes::OperationFailed, - stream() << "failed to run command " << cmdObj - << " when attempting to add shard " - << targeter->connectionString().toString() - << causedBy(swResponse.status)}; - } - return swResponse.status; - } - - BSONObj responseObj = swResponse.data.getOwned(); - BSONObj responseMetadata = swResponse.metadata.getOwned(); - - Status commandStatus = getStatusFromCommandResult(responseObj); - if (!Shard::shouldErrorBePropagated(commandStatus.code())) { - commandStatus = {ErrorCodes::OperationFailed, - stream() << "failed to run command " << cmdObj - << " when attempting to add shard " - << targeter->connectionString().toString() - << causedBy(commandStatus)}; - } - - Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); - if (!Shard::shouldErrorBePropagated(writeConcernStatus.code())) { - writeConcernStatus = {ErrorCodes::OperationFailed, - stream() << "failed to satisfy writeConcern for command " << cmdObj - << " when attempting to add shard " - << targeter->connectionString().toString() - << causedBy(writeConcernStatus)}; - } - - return Shard::CommandResponse(std::move(responseObj), - std::move(responseMetadata), - std::move(commandStatus), - std::move(writeConcernStatus)); -} - -StatusWith<boost::optional<ShardType>> ShardingCatalogManagerImpl::_checkIfShardExists( - OperationContext* txn, - const ConnectionString& proposedShardConnectionString, - const std::string* proposedShardName, - long long proposedShardMaxSize) { - // Check whether any host in the connection is already part of the cluster. - const auto existingShards = - _catalogClient->getAllShards(txn, repl::ReadConcernLevel::kLocalReadConcern); - if (!existingShards.isOK()) { - return Status(existingShards.getStatus().code(), - str::stream() << "Failed to load existing shards during addShard" - << causedBy(existingShards.getStatus().reason())); - } - - // Now check if this shard already exists - if it already exists *with the same options* then - // the addShard request can return success early without doing anything more. - for (const auto& existingShard : existingShards.getValue().value) { - auto swExistingShardConnStr = ConnectionString::parse(existingShard.getHost()); - if (!swExistingShardConnStr.isOK()) { - return swExistingShardConnStr.getStatus(); - } - auto existingShardConnStr = std::move(swExistingShardConnStr.getValue()); - // Function for determining if the options for the shard that is being added match the - // options of an existing shard that conflicts with it. - auto shardsAreEquivalent = [&]() { - if (proposedShardName && *proposedShardName != existingShard.getName()) { - return false; - } - if (proposedShardConnectionString.type() != existingShardConnStr.type()) { - return false; - } - if (proposedShardConnectionString.type() == ConnectionString::SET && - proposedShardConnectionString.getSetName() != existingShardConnStr.getSetName()) { - return false; - } - if (proposedShardMaxSize != existingShard.getMaxSizeMB()) { - return false; - } - return true; - }; - - if (existingShardConnStr.type() == ConnectionString::SET && - proposedShardConnectionString.type() == ConnectionString::SET && - existingShardConnStr.getSetName() == proposedShardConnectionString.getSetName()) { - // An existing shard has the same replica set name as the shard being added. - // If the options aren't the same, then this is an error, - // but if the options match then the addShard operation should be immediately - // considered a success and terminated. - if (shardsAreEquivalent()) { - return {existingShard}; - } else { - return {ErrorCodes::IllegalOperation, - str::stream() << "A shard already exists containing the replica set '" - << existingShardConnStr.getSetName() - << "'"}; - } - } - - for (const auto& existingHost : existingShardConnStr.getServers()) { - // Look if any of the hosts in the existing shard are present within the shard trying - // to be added. - for (const auto& addingHost : proposedShardConnectionString.getServers()) { - if (existingHost == addingHost) { - // At least one of the hosts in the shard being added already exists in an - // existing shard. If the options aren't the same, then this is an error, - // but if the options match then the addShard operation should be immediately - // considered a success and terminated. - if (shardsAreEquivalent()) { - return {existingShard}; - } else { - return {ErrorCodes::IllegalOperation, - str::stream() << "'" << addingHost.toString() << "' " - << "is already a member of the existing shard '" - << existingShard.getHost() - << "' (" - << existingShard.getName() - << ")."}; - } - } - } - } - if (proposedShardName && *proposedShardName == existingShard.getName()) { - // If we get here then we're trying to add a shard with the same name as an existing - // shard, but there was no overlap in the hosts between the existing shard and the - // proposed connection string for the new shard. - return Status(ErrorCodes::IllegalOperation, - str::stream() << "A shard named " << *proposedShardName - << " already exists"); - } - } - return {boost::none}; -} - -StatusWith<ShardType> ShardingCatalogManagerImpl::_validateHostAsShard( - OperationContext* txn, - std::shared_ptr<RemoteCommandTargeter> targeter, - const std::string* shardProposedName, - const ConnectionString& connectionString) { - - // Check if the node being added is a mongos or a version of mongod too old to speak the current - // communication protocol. - auto swCommandResponse = - _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1)); - if (!swCommandResponse.isOK()) { - if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) { - // Mongos to mongos commands are no longer supported in the wire protocol - // (because mongos does not support OP_COMMAND), similarly for a new mongos - // and an old mongod. So the call will fail in such cases. - // TODO: If/When mongos ever supports opCommands, this logic will break because - // cmdStatus will be OK. - return {ErrorCodes::RPCProtocolNegotiationFailed, - str::stream() << targeter->connectionString().toString() - << " does not recognize the RPC protocol being used. This is" - << " likely because it contains a node that is a mongos or an old" - << " version of mongod."}; - } else { - return swCommandResponse.getStatus(); - } - } - - // Check for a command response error - auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus); - if (!resIsMasterStatus.isOK()) { - return {resIsMasterStatus.code(), - str::stream() << "Error running isMaster against " - << targeter->connectionString().toString() - << ": " - << causedBy(resIsMasterStatus)}; - } - - auto resIsMaster = std::move(swCommandResponse.getValue().response); - - // Check that the node being added is a new enough version. - // If we're running this code, that means the mongos that the addShard request originated from - // must be at least version 3.4 (since 3.2 mongoses don't know about the _configsvrAddShard - // command). Since it is illegal to have v3.4 mongoses with v3.2 shards, we should reject - // adding any shards that are not v3.4. We can determine this by checking that the - // maxWireVersion reported in isMaster is at least COMMANDS_ACCEPT_WRITE_CONCERN. - // TODO(SERVER-25623): This approach won't work to prevent v3.6 mongoses from adding v3.4 - // shards, so we'll have to rethink this during the 3.5 development cycle. - - long long maxWireVersion; - Status status = bsonExtractIntegerField(resIsMaster, "maxWireVersion", &maxWireVersion); - if (!status.isOK()) { - return Status(status.code(), - str::stream() << "isMaster returned invalid 'maxWireVersion' " - << "field when attempting to add " - << connectionString.toString() - << " as a shard: " - << status.reason()); - } - if (maxWireVersion < WireVersion::COMMANDS_ACCEPT_WRITE_CONCERN) { - return Status(ErrorCodes::IncompatibleServerVersion, - str::stream() << "Cannot add " << connectionString.toString() - << " as a shard because we detected a mongod with server " - "version older than 3.4.0. It is invalid to add v3.2 and " - "older shards through a v3.4 mongos."); - } - - - // Check whether there is a master. If there isn't, the replica set may not have been - // initiated. If the connection is a standalone, it will return true for isMaster. - bool isMaster; - status = bsonExtractBooleanField(resIsMaster, "ismaster", &isMaster); - if (!status.isOK()) { - return Status(status.code(), - str::stream() << "isMaster returned invalid 'ismaster' " - << "field when attempting to add " - << connectionString.toString() - << " as a shard: " - << status.reason()); - } - if (!isMaster) { - return {ErrorCodes::NotMaster, - str::stream() - << connectionString.toString() - << " does not have a master. If this is a replica set, ensure that it has a" - << " healthy primary and that the set has been properly initiated."}; - } - - const string providedSetName = connectionString.getSetName(); - const string foundSetName = resIsMaster["setName"].str(); - - // Make sure the specified replica set name (if any) matches the actual shard's replica set - if (providedSetName.empty() && !foundSetName.empty()) { - return {ErrorCodes::OperationFailed, - str::stream() << "host is part of set " << foundSetName << "; " - << "use replica set url format " - << "<setname>/<server1>,<server2>, ..."}; - } - - if (!providedSetName.empty() && foundSetName.empty()) { - return {ErrorCodes::OperationFailed, - str::stream() << "host did not return a set name; " - << "is the replica set still initializing? " - << resIsMaster}; - } - - // Make sure the set name specified in the connection string matches the one where its hosts - // belong into - if (!providedSetName.empty() && (providedSetName != foundSetName)) { - return {ErrorCodes::OperationFailed, - str::stream() << "the provided connection string (" << connectionString.toString() - << ") does not match the actual set name " - << foundSetName}; - } - - // Is it a config server? - if (resIsMaster.hasField("configsvr")) { - return {ErrorCodes::OperationFailed, - str::stream() << "Cannot add " << connectionString.toString() - << " as a shard since it is a config server"}; - } - - // If the shard is part of a replica set, make sure all the hosts mentioned in the connection - // string are part of the set. It is fine if not all members of the set are mentioned in the - // connection string, though. - if (!providedSetName.empty()) { - std::set<string> hostSet; - - BSONObjIterator iter(resIsMaster["hosts"].Obj()); - while (iter.more()) { - hostSet.insert(iter.next().String()); // host:port - } - - if (resIsMaster["passives"].isABSONObj()) { - BSONObjIterator piter(resIsMaster["passives"].Obj()); - while (piter.more()) { - hostSet.insert(piter.next().String()); // host:port - } - } - - if (resIsMaster["arbiters"].isABSONObj()) { - BSONObjIterator piter(resIsMaster["arbiters"].Obj()); - while (piter.more()) { - hostSet.insert(piter.next().String()); // host:port - } - } - - vector<HostAndPort> hosts = connectionString.getServers(); - for (size_t i = 0; i < hosts.size(); i++) { - const string host = hosts[i].toString(); // host:port - if (hostSet.find(host) == hostSet.end()) { - return {ErrorCodes::OperationFailed, - str::stream() << "in seed list " << connectionString.toString() << ", host " - << host - << " does not belong to replica set " - << foundSetName - << "; found " - << resIsMaster.toString()}; - } - } - } - - string actualShardName; - - if (shardProposedName) { - actualShardName = *shardProposedName; - } else if (!foundSetName.empty()) { - // Default it to the name of the replica set - actualShardName = foundSetName; - } - - // Disallow adding shard replica set with name 'config' - if (actualShardName == "config") { - return {ErrorCodes::BadValue, "use of shard replica set with name 'config' is not allowed"}; - } - - // Retrieve the most up to date connection string that we know from the replica set monitor (if - // this is a replica set shard, otherwise it will be the same value as connectionString). - ConnectionString actualShardConnStr = targeter->connectionString(); - - ShardType shard; - shard.setName(actualShardName); - shard.setHost(actualShardConnStr.toString()); - shard.setState(ShardType::ShardState::kShardAware); - - return shard; -} - -StatusWith<std::vector<std::string>> ShardingCatalogManagerImpl::_getDBNamesListFromShard( - OperationContext* txn, std::shared_ptr<RemoteCommandTargeter> targeter) { - - auto swCommandResponse = - _runCommandForAddShard(txn, targeter.get(), "admin", BSON("listDatabases" << 1)); - if (!swCommandResponse.isOK()) { - return swCommandResponse.getStatus(); - } - - auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus); - if (!cmdStatus.isOK()) { - return cmdStatus; - } - - auto cmdResult = std::move(swCommandResponse.getValue().response); - - vector<string> dbNames; - - for (const auto& dbEntry : cmdResult["databases"].Obj()) { - const string& dbName = dbEntry["name"].String(); - - if (!(dbName == "local" || dbName == "admin")) { - dbNames.push_back(dbName); - } - } - - return dbNames; -} - -StatusWith<std::string> ShardingCatalogManagerImpl::_generateNewShardName(OperationContext* txn) { - BSONObjBuilder shardNameRegex; - shardNameRegex.appendRegex(ShardType::name(), "^shard"); - - auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - kConfigReadSelector, - repl::ReadConcernLevel::kMajorityReadConcern, - NamespaceString(ShardType::ConfigNS), - shardNameRegex.obj(), - BSON(ShardType::name() << -1), - 1); - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - const auto& docs = findStatus.getValue().docs; - - int count = 0; - if (!docs.empty()) { - const auto shardStatus = ShardType::fromBSON(docs.front()); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - - std::istringstream is(shardStatus.getValue().getName().substr(5)); - is >> count; - count++; - } - - // TODO fix so that we can have more than 10000 automatically generated shard names - if (count < 9999) { - std::stringstream ss; - ss << "shard" << std::setfill('0') << std::setw(4) << count; - return ss.str(); - } - - return Status(ErrorCodes::OperationFailed, "unable to generate new shard name"); -} - -StatusWith<string> ShardingCatalogManagerImpl::addShard( - OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) { - if (shardConnectionString.type() == ConnectionString::INVALID) { - return {ErrorCodes::BadValue, "Invalid connection string"}; - } - - if (shardProposedName && shardProposedName->empty()) { - return {ErrorCodes::BadValue, "shard name cannot be empty"}; - } - - // Only one addShard operation can be in progress at a time. - Lock::ExclusiveLock lk(txn->lockState(), _kShardMembershipLock); - - // Check if this shard has already been added (can happen in the case of a retry after a network - // error, for example) and thus this addShard request should be considered a no-op. - auto existingShard = - _checkIfShardExists(txn, shardConnectionString, shardProposedName, maxSize); - if (!existingShard.isOK()) { - return existingShard.getStatus(); - } - if (existingShard.getValue()) { - // These hosts already belong to an existing shard, so report success and terminate the - // addShard request. Make sure to set the last optime for the client to the system last - // optime so that we'll still wait for replication so that this state is visible in the - // committed snapshot. - repl::ReplClientInfo::forClient(txn->getClient()).setLastOpToSystemLastOpTime(txn); - return existingShard.getValue()->getName(); - } - - // Force a reload of the ShardRegistry to ensure that, in case this addShard is to re-add a - // replica set that has recently been removed, we have detached the ReplicaSetMonitor for the - // set with that setName from the ReplicaSetMonitorManager and will create a new - // ReplicaSetMonitor when targeting the set below. - // Note: This is necessary because as of 3.4, removeShard is performed by mongos (unlike - // addShard), so the ShardRegistry is not synchronously reloaded on the config server when a - // shard is removed. - if (!Grid::get(txn)->shardRegistry()->reload(txn)) { - // If the first reload joined an existing one, call reload again to ensure the reload is - // fresh. - Grid::get(txn)->shardRegistry()->reload(txn); - } - - // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead. - const std::shared_ptr<Shard> shard{ - Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)}; - invariant(shard); - auto targeter = shard->getTargeter(); - - auto stopMonitoringGuard = MakeGuard([&] { - if (shardConnectionString.type() == ConnectionString::SET) { - // This is a workaround for the case were we could have some bad shard being - // requested to be added and we put that bad connection string on the global replica set - // monitor registry. It needs to be cleaned up so that when a correct replica set is - // added, it will be recreated. - ReplicaSetMonitor::remove(shardConnectionString.getSetName()); - } - }); - - // Validate the specified connection string may serve as shard at all - auto shardStatus = - _validateHostAsShard(txn, targeter, shardProposedName, shardConnectionString); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - ShardType& shardType = shardStatus.getValue(); - - // Check that none of the existing shard candidate's dbs exist already - auto dbNamesStatus = _getDBNamesListFromShard(txn, targeter); - if (!dbNamesStatus.isOK()) { - return dbNamesStatus.getStatus(); - } - - for (const string& dbName : dbNamesStatus.getValue()) { - auto dbt = _catalogClient->getDatabase(txn, dbName); - if (dbt.isOK()) { - const auto& dbDoc = dbt.getValue().value; - return Status(ErrorCodes::OperationFailed, - str::stream() << "can't add shard " - << "'" - << shardConnectionString.toString() - << "'" - << " because a local database '" - << dbName - << "' exists in another " - << dbDoc.getPrimary()); - } else if (dbt != ErrorCodes::NamespaceNotFound) { - return dbt.getStatus(); - } - } - - // If a name for a shard wasn't provided, generate one - if (shardType.getName().empty()) { - StatusWith<string> result = _generateNewShardName(txn); - if (!result.isOK()) { - return result.getStatus(); - } - shardType.setName(result.getValue()); - } - - if (maxSize > 0) { - shardType.setMaxSizeMB(maxSize); - } - - // If the minimum allowed version for the cluster is 3.4, set the featureCompatibilityVersion to - // 3.4 on the shard. - if (serverGlobalParams.featureCompatibility.version.load() == - ServerGlobalParams::FeatureCompatibility::Version::k34) { - auto versionResponse = - _runCommandForAddShard(txn, - targeter.get(), - "admin", - BSON(FeatureCompatibilityVersion::kCommandName - << FeatureCompatibilityVersionCommandParser::kVersion34)); - if (!versionResponse.isOK()) { - return versionResponse.getStatus(); - } - - if (!versionResponse.getValue().commandStatus.isOK()) { - if (versionResponse.getStatus().code() == ErrorCodes::CommandNotFound) { - return Status(ErrorCodes::OperationFailed, - "featureCompatibilityVersion for cluster is 3.4, cannot add a shard " - "with version below 3.4. See " - "http://dochub.mongodb.org/core/3.4-feature-compatibility."); - } - return versionResponse.getValue().commandStatus; - } - } - - if (!MONGO_FAIL_POINT(dontUpsertShardIdentityOnNewShards)) { - auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName()); - - LOG(2) << "going to insert shardIdentity document into shard: " << shardType; - - auto swCommandResponse = - _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest); - if (!swCommandResponse.isOK()) { - return swCommandResponse.getStatus(); - } - - auto commandResponse = std::move(swCommandResponse.getValue()); - - BatchedCommandResponse batchResponse; - auto batchResponseStatus = - Shard::CommandResponse::processBatchWriteResponse(commandResponse, &batchResponse); - if (!batchResponseStatus.isOK()) { - return batchResponseStatus; - } - } - - log() << "going to insert new entry for shard into config.shards: " << shardType.toString(); - - Status result = _catalogClient->insertConfigDocument( - txn, ShardType::ConfigNS, shardType.toBSON(), ShardingCatalogClient::kMajorityWriteConcern); - if (!result.isOK()) { - log() << "error adding shard: " << shardType.toBSON() << " err: " << result.reason(); - return result; - } - - // Add all databases which were discovered on the new shard - for (const string& dbName : dbNamesStatus.getValue()) { - DatabaseType dbt; - dbt.setName(dbName); - dbt.setPrimary(shardType.getName()); - dbt.setSharded(false); - - Status status = _catalogClient->updateDatabase(txn, dbName, dbt); - if (!status.isOK()) { - log() << "adding shard " << shardConnectionString.toString() - << " even though could not add database " << dbName; - } - } - - // Record in changelog - BSONObjBuilder shardDetails; - shardDetails.append("name", shardType.getName()); - shardDetails.append("host", shardConnectionString.toString()); - - _catalogClient->logChange( - txn, "addShard", "", shardDetails.obj(), ShardingCatalogClient::kMajorityWriteConcern); - - // Ensure the added shard is visible to this process. - auto shardRegistry = Grid::get(txn)->shardRegistry(); - if (!shardRegistry->getShard(txn, shardType.getName()).isOK()) { - return {ErrorCodes::OperationFailed, - "Could not find shard metadata for shard after adding it. This most likely " - "indicates that the shard was removed immediately after it was added."}; - } - stopMonitoringGuard.Dismiss(); - - return shardType.getName(); -} - -Status ShardingCatalogManagerImpl::addShardToZone(OperationContext* txn, - const std::string& shardName, - const std::string& zoneName) { - Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); - - auto updateStatus = _catalogClient->updateConfigDocument( - txn, - ShardType::ConfigNS, - BSON(ShardType::name(shardName)), - BSON("$addToSet" << BSON(ShardType::tags() << zoneName)), - false, - kNoWaitWriteConcern); - - if (!updateStatus.isOK()) { - return updateStatus.getStatus(); - } - - if (!updateStatus.getValue()) { - return {ErrorCodes::ShardNotFound, - str::stream() << "shard " << shardName << " does not exist"}; - } - - return Status::OK(); -} - -Status ShardingCatalogManagerImpl::removeShardFromZone(OperationContext* txn, - const std::string& shardName, - const std::string& zoneName) { - Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); - - auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); - const NamespaceString shardNS(ShardType::ConfigNS); - - // - // Check whether the shard even exist in the first place. - // - - auto findShardExistsStatus = - configShard->exhaustiveFindOnConfig(txn, - kConfigPrimarySelector, - repl::ReadConcernLevel::kLocalReadConcern, - shardNS, - BSON(ShardType::name() << shardName), - BSONObj(), - 1); - - if (!findShardExistsStatus.isOK()) { - return findShardExistsStatus.getStatus(); - } - - if (findShardExistsStatus.getValue().docs.size() == 0) { - return {ErrorCodes::ShardNotFound, - str::stream() << "shard " << shardName << " does not exist"}; - } - - // - // Check how many shards belongs to this zone. - // - - auto findShardStatus = - configShard->exhaustiveFindOnConfig(txn, - kConfigPrimarySelector, - repl::ReadConcernLevel::kLocalReadConcern, - shardNS, - BSON(ShardType::tags() << zoneName), - BSONObj(), - 2); - - if (!findShardStatus.isOK()) { - return findShardStatus.getStatus(); - } - - const auto shardDocs = findShardStatus.getValue().docs; - - if (shardDocs.size() == 0) { - // The zone doesn't exists, this could be a retry. - return Status::OK(); - } - - if (shardDocs.size() == 1) { - auto shardDocStatus = ShardType::fromBSON(shardDocs.front()); - if (!shardDocStatus.isOK()) { - return shardDocStatus.getStatus(); - } - - auto shardDoc = shardDocStatus.getValue(); - if (shardDoc.getName() != shardName) { - // The last shard that belongs to this zone is a different shard. - // This could be a retry, so return OK. - return Status::OK(); - } - - auto findChunkRangeStatus = - configShard->exhaustiveFindOnConfig(txn, - kConfigPrimarySelector, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(TagsType::ConfigNS), - BSON(TagsType::tag() << zoneName), - BSONObj(), - 1); - - if (!findChunkRangeStatus.isOK()) { - return findChunkRangeStatus.getStatus(); - } - - if (findChunkRangeStatus.getValue().docs.size() > 0) { - return {ErrorCodes::ZoneStillInUse, - "cannot remove a shard from zone if a chunk range is associated with it"}; - } - } - - // - // Perform update. - // - - auto updateStatus = - _catalogClient->updateConfigDocument(txn, - ShardType::ConfigNS, - BSON(ShardType::name(shardName)), - BSON("$pull" << BSON(ShardType::tags() << zoneName)), - false, - kNoWaitWriteConcern); - - if (!updateStatus.isOK()) { - return updateStatus.getStatus(); - } - - // The update did not match a document, another thread could have removed it. - if (!updateStatus.getValue()) { - return {ErrorCodes::ShardNotFound, - str::stream() << "shard " << shardName << " no longer exist"}; - } - - return Status::OK(); -} - - -Status ShardingCatalogManagerImpl::assignKeyRangeToZone(OperationContext* txn, - const NamespaceString& ns, - const ChunkRange& givenRange, - const string& zoneName) { - Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); - - auto configServer = Grid::get(txn)->shardRegistry()->getConfigShard(); - - KeyPattern shardKeyPattern{BSONObj()}; - auto fullShardKeyStatus = - includeFullShardKey(txn, configServer.get(), ns, givenRange, &shardKeyPattern); - if (!fullShardKeyStatus.isOK()) { - return fullShardKeyStatus.getStatus(); - } - - const auto& fullShardKeyRange = fullShardKeyStatus.getValue(); - - auto zoneExistStatus = - configServer->exhaustiveFindOnConfig(txn, - kConfigPrimarySelector, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ShardType::ConfigNS), - BSON(ShardType::tags() << zoneName), - BSONObj(), - 1); - - if (!zoneExistStatus.isOK()) { - return zoneExistStatus.getStatus(); - } - - auto zoneExist = zoneExistStatus.getValue().docs.size() > 0; - if (!zoneExist) { - return {ErrorCodes::ZoneNotFound, - str::stream() << "zone " << zoneName << " does not exist"}; - } - - auto overlapStatus = checkForOveralappedZonedKeyRange( - txn, configServer.get(), ns, fullShardKeyRange, zoneName, shardKeyPattern); - if (!overlapStatus.isOK()) { - return overlapStatus; - } - - BSONObj updateQuery( - BSON("_id" << BSON(TagsType::ns(ns.ns()) << TagsType::min(fullShardKeyRange.getMin())))); - - BSONObjBuilder updateBuilder; - updateBuilder.append("_id", - BSON(TagsType::ns(ns.ns()) << TagsType::min(fullShardKeyRange.getMin()))); - updateBuilder.append(TagsType::ns(), ns.ns()); - updateBuilder.append(TagsType::min(), fullShardKeyRange.getMin()); - updateBuilder.append(TagsType::max(), fullShardKeyRange.getMax()); - updateBuilder.append(TagsType::tag(), zoneName); - - auto updateStatus = _catalogClient->updateConfigDocument( - txn, TagsType::ConfigNS, updateQuery, updateBuilder.obj(), true, kNoWaitWriteConcern); - - if (!updateStatus.isOK()) { - return updateStatus.getStatus(); - } - - return Status::OK(); -} - -Status ShardingCatalogManagerImpl::removeKeyRangeFromZone(OperationContext* txn, - const NamespaceString& ns, - const ChunkRange& range) { - Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); - - auto configServer = Grid::get(txn)->shardRegistry()->getConfigShard(); - - KeyPattern shardKeyPattern{BSONObj()}; - auto fullShardKeyStatus = - includeFullShardKey(txn, configServer.get(), ns, range, &shardKeyPattern); - if (!fullShardKeyStatus.isOK()) { - return fullShardKeyStatus.getStatus(); - } - - BSONObjBuilder removeBuilder; - removeBuilder.append("_id", BSON(TagsType::ns(ns.ns()) << TagsType::min(range.getMin()))); - removeBuilder.append(TagsType::max(), range.getMax()); - - return _catalogClient->removeConfigDocuments( - txn, TagsType::ConfigNS, removeBuilder.obj(), kNoWaitWriteConcern); -} - -Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, - const NamespaceString& ns, - const OID& requestEpoch, - const ChunkRange& range, - const std::vector<BSONObj>& splitPoints, - const std::string& shardName) { - // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and - // migrations - // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ - // move chunks on different collections to proceed in parallel - Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); - - // Get the chunk with highest version for this namespace - auto findStatus = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - BSON("ns" << ns.ns()), - BSON(ChunkType::DEPRECATED_lastmod << -1), - 1); - - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - const auto& chunksVector = findStatus.getValue().docs; - if (chunksVector.empty()) - return {ErrorCodes::IllegalOperation, - "collection does not exist, isn't sharded, or has no chunks"}; - - ChunkVersion collVersion = - ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); - - // Return an error if epoch of chunk does not match epoch of request - if (collVersion.epoch() != requestEpoch) { - return {ErrorCodes::StaleEpoch, - "epoch of chunk does not match epoch of request. This most likely means " - "that the collection was dropped and re-created."}; - } - - std::vector<ChunkType> newChunks; - - ChunkVersion currentMaxVersion = collVersion; - - auto startKey = range.getMin(); - auto newChunkBounds(splitPoints); - newChunkBounds.push_back(range.getMax()); - - BSONArrayBuilder updates; - - for (const auto& endKey : newChunkBounds) { - // Verify the split points are all within the chunk - if (endKey.woCompare(range.getMax()) != 0 && !range.containsKey(endKey)) { - return {ErrorCodes::InvalidOptions, - str::stream() << "Split key " << endKey << " not contained within chunk " - << range.toString()}; - } - - // Verify the split points came in increasing order - if (endKey.woCompare(startKey) < 0) { - return { - ErrorCodes::InvalidOptions, - str::stream() << "Split keys must be specified in strictly increasing order. Key " - << endKey - << " was specified after " - << startKey - << "."}; - } - - // Verify that splitPoints are not repeated - if (endKey.woCompare(startKey) == 0) { - return {ErrorCodes::InvalidOptions, - str::stream() << "Split on lower bound of chunk " - << ChunkRange(startKey, endKey).toString() - << "is not allowed"}; - } - - // verify that splits don't create too-big shard keys - Status shardKeyStatus = ShardKeyPattern::checkShardKeySize(endKey); - if (!shardKeyStatus.isOK()) { - return shardKeyStatus; - } - - // splits only update the 'minor' portion of version - currentMaxVersion.incMinor(); - - // build an update operation against the chunks collection of the config database - // with upsert true - BSONObjBuilder op; - op.append("op", "u"); - op.appendBool("b", true); - op.append("ns", ChunkType::ConfigNS); - - // add the modified (new) chunk information as the update object - BSONObjBuilder n(op.subobjStart("o")); - n.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey)); - currentMaxVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); - n.append(ChunkType::ns(), ns.ns()); - n.append(ChunkType::min(), startKey); - n.append(ChunkType::max(), endKey); - n.append(ChunkType::shard(), shardName); - n.done(); - - // add the chunk's _id as the query part of the update statement - BSONObjBuilder q(op.subobjStart("o2")); - q.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey)); - q.done(); - - updates.append(op.obj()); - - // remember this chunk info for logging later - ChunkType chunk; - chunk.setMin(startKey); - chunk.setMax(endKey); - chunk.setVersion(currentMaxVersion); - - newChunks.push_back(std::move(chunk)); - - startKey = endKey; - } - - BSONArrayBuilder preCond; - { - BSONObjBuilder b; - b.append("ns", ChunkType::ConfigNS); - b.append("q", - BSON("query" << BSON(ChunkType::ns(ns.ns()) << ChunkType::min() << range.getMin() - << ChunkType::max() - << range.getMax()) - << "orderby" - << BSON(ChunkType::DEPRECATED_lastmod() << -1))); - { - BSONObjBuilder bb(b.subobjStart("res")); - bb.append(ChunkType::DEPRECATED_epoch(), requestEpoch); - bb.append(ChunkType::shard(), shardName); - } - preCond.append(b.obj()); - } - - // apply the batch of updates to remote and local metadata - Status applyOpsStatus = - grid.catalogClient(txn)->applyChunkOpsDeprecated(txn, - updates.arr(), - preCond.arr(), - ns.ns(), - currentMaxVersion, - WriteConcernOptions(), - repl::ReadConcernLevel::kLocalReadConcern); - if (!applyOpsStatus.isOK()) { - return applyOpsStatus; - } - - // log changes - BSONObjBuilder logDetail; - { - BSONObjBuilder b(logDetail.subobjStart("before")); - b.append(ChunkType::min(), range.getMin()); - b.append(ChunkType::max(), range.getMax()); - collVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod()); - } - - if (newChunks.size() == 2) { - _appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]); - _appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]); - - grid.catalogClient(txn)->logChange( - txn, "split", ns.ns(), logDetail.obj(), WriteConcernOptions()); - } else { - BSONObj beforeDetailObj = logDetail.obj(); - BSONObj firstDetailObj = beforeDetailObj.getOwned(); - const int newChunksSize = newChunks.size(); - - for (int i = 0; i < newChunksSize; i++) { - BSONObjBuilder chunkDetail; - chunkDetail.appendElements(beforeDetailObj); - chunkDetail.append("number", i + 1); - chunkDetail.append("of", newChunksSize); - _appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]); - - grid.catalogClient(txn)->logChange( - txn, "multi-split", ns.ns(), chunkDetail.obj(), WriteConcernOptions()); - } - } - - return applyOpsStatus; -} - -Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, - const NamespaceString& ns, - const OID& requestEpoch, - const std::vector<BSONObj>& chunkBoundaries, - const std::string& shardName) { - // This method must never be called with empty chunks to merge - invariant(!chunkBoundaries.empty()); - - // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and - // migrations - // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ - // move chunks on different collections to proceed in parallel - Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); - - // Get the chunk with the highest version for this namespace - auto findStatus = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - BSON("ns" << ns.ns()), - BSON(ChunkType::DEPRECATED_lastmod << -1), - 1); - - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - const auto& chunksVector = findStatus.getValue().docs; - if (chunksVector.empty()) - return {ErrorCodes::IllegalOperation, - "collection does not exist, isn't sharded, or has no chunks"}; - - ChunkVersion collVersion = - ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); - - // Return an error if epoch of chunk does not match epoch of request - if (collVersion.epoch() != requestEpoch) { - return {ErrorCodes::StaleEpoch, - "epoch of chunk does not match epoch of request. This most likely means " - "that the collection was dropped and re-created."}; - } - - // Build chunks to be merged - std::vector<ChunkType> chunksToMerge; - - ChunkType itChunk; - itChunk.setMax(chunkBoundaries.front()); - itChunk.setNS(ns.ns()); - itChunk.setShard(shardName); - - // Do not use the first chunk boundary as a max bound while building chunks - for (size_t i = 1; i < chunkBoundaries.size(); ++i) { - itChunk.setMin(itChunk.getMax()); - - // Ensure the chunk boundaries are strictly increasing - if (chunkBoundaries[i].woCompare(itChunk.getMin()) <= 0) { - return { - ErrorCodes::InvalidOptions, - str::stream() - << "Chunk boundaries must be specified in strictly increasing order. Boundary " - << chunkBoundaries[i] - << " was specified after " - << itChunk.getMin() - << "."}; - } - - itChunk.setMax(chunkBoundaries[i]); - chunksToMerge.push_back(itChunk); - } - - ChunkVersion mergeVersion = collVersion; - mergeVersion.incMinor(); - - auto updates = buildMergeChunksApplyOpsUpdates(chunksToMerge, mergeVersion); - auto preCond = buildMergeChunksApplyOpsPrecond(chunksToMerge, collVersion); - - // apply the batch of updates to remote and local metadata - Status applyOpsStatus = - grid.catalogClient(txn)->applyChunkOpsDeprecated(txn, - updates, - preCond, - ns.ns(), - mergeVersion, - WriteConcernOptions(), - repl::ReadConcernLevel::kLocalReadConcern); - if (!applyOpsStatus.isOK()) { - return applyOpsStatus; - } - - // log changes - BSONObjBuilder logDetail; - { - BSONArrayBuilder b(logDetail.subarrayStart("merged")); - for (auto chunkToMerge : chunksToMerge) { - b.append(chunkToMerge.toBSON()); - } - } - collVersion.addToBSON(logDetail, "prevShardVersion"); - mergeVersion.addToBSON(logDetail, "mergedVersion"); - - grid.catalogClient(txn)->logChange( - txn, "merge", ns.ns(), logDetail.obj(), WriteConcernOptions()); - - return applyOpsStatus; -} - -namespace { - -/** - * Checks that the epoch in the version the shard sent with the command matches the epoch of the - * collection version found on the config server. It is possible for a migration to end up - * running partly without the protection of the distributed lock. This function checks that the - * collection has not been dropped and recreated since the migration began, unbeknown to the - * shard when the command was sent. - */ -static Status checkCollectionVersionEpoch(OperationContext* txn, - const NamespaceString& nss, - const ChunkType& aChunk, - const OID& collectionEpoch) { - auto findResponseWith = - Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - BSON(ChunkType::ns() << nss.ns()), - BSONObj(), - 1); - if (!findResponseWith.isOK()) { - return findResponseWith.getStatus(); - } - - if (MONGO_FAIL_POINT(migrationCommitVersionError)) { - uassert(ErrorCodes::StaleEpoch, - "failpoint 'migrationCommitVersionError' generated error", - false); - } - - if (findResponseWith.getValue().docs.empty()) { - return Status( - ErrorCodes::IncompatibleShardingMetadata, - str::stream() - << "Could not find any chunks for collection '" - << nss.ns() - << "'. The collection has been dropped since the migration began. Aborting" - " migration commit for chunk (" - << redact(aChunk.getRange().toString()) - << ")."); - } - - auto chunkWith = ChunkType::fromBSON(findResponseWith.getValue().docs.front()); - if (!chunkWith.isOK()) { - return chunkWith.getStatus(); - } else if (chunkWith.getValue().getVersion().epoch() != collectionEpoch) { - return Status(ErrorCodes::StaleEpoch, - str::stream() << "The collection '" << nss.ns() - << "' has been dropped and recreated since the migration began." - " The config server's collection version epoch is now '" - << chunkWith.getValue().getVersion().epoch().toString() - << "', but the shard's is " - << collectionEpoch.toString() - << "'. Aborting migration commit for chunk (" - << redact(aChunk.getRange().toString()) - << ")."); - } - return Status::OK(); -} - -static Status checkChunkIsOnShard(OperationContext* txn, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const ShardId& shard) { - BSONObj chunkQuery = - BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max - << ChunkType::shard() - << shard); - // Must use local read concern because we're going to perform subsequent writes. - auto findResponseWith = - Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - chunkQuery, - BSONObj(), - 1); - if (!findResponseWith.isOK()) { - return findResponseWith.getStatus(); - } - if (findResponseWith.getValue().docs.empty()) { - return Status( - ErrorCodes::Error(40165), - str::stream() - << "Could not find the chunk (" - << chunkQuery.toString() - << ") on the shard. Cannot execute the migration commit with invalid chunks."); - } - return Status::OK(); -} - -static BSONObj makeCommitChunkApplyOpsCommand(const NamespaceString& nss, - const ChunkType& migratedChunk, - const boost::optional<ChunkType>& controlChunk, - StringData fromShard, - StringData toShard) { - - // Update migratedChunk's version and shard. - BSONArrayBuilder updates; - { - BSONObjBuilder op; - op.append("op", "u"); - op.appendBool("b", false); // No upserting - op.append("ns", ChunkType::ConfigNS); - - BSONObjBuilder n(op.subobjStart("o")); - n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin())); - migratedChunk.getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod()); - n.append(ChunkType::ns(), nss.ns()); - n.append(ChunkType::min(), migratedChunk.getMin()); - n.append(ChunkType::max(), migratedChunk.getMax()); - n.append(ChunkType::shard(), toShard); - n.done(); - - BSONObjBuilder q(op.subobjStart("o2")); - q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin())); - q.done(); - - updates.append(op.obj()); - } - - // If we have a controlChunk, update its chunk version. - if (controlChunk) { - BSONObjBuilder op; - op.append("op", "u"); - op.appendBool("b", false); - op.append("ns", ChunkType::ConfigNS); - - BSONObjBuilder n(op.subobjStart("o")); - n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin())); - controlChunk->getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod()); - n.append(ChunkType::ns(), nss.ns()); - n.append(ChunkType::min(), controlChunk->getMin()); - n.append(ChunkType::max(), controlChunk->getMax()); - n.append(ChunkType::shard(), fromShard); - n.done(); - - BSONObjBuilder q(op.subobjStart("o2")); - q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin())); - q.done(); - - updates.append(op.obj()); - } - - // Do not give applyOps a write concern. If applyOps tries to wait for replication, it will - // fail because of the GlobalWrite lock CommitChunkMigration already holds. Replication will - // not be able to take the lock it requires. - return BSON("applyOps" << updates.arr()); -} - -} // namespace - -StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration( - OperationContext* txn, - const NamespaceString& nss, - const ChunkType& migratedChunk, - const boost::optional<ChunkType>& controlChunk, - const OID& collectionEpoch, - const ShardId& fromShard, - const ShardId& toShard) { - - // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and - // migrations. - // - // ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions - // for migrated chunks are generated within the command and must be committed to the database - // before another chunk commit generates new ChunkVersions in the same manner. - // - // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ - // move chunks on different collections to proceed in parallel. - // (Note: This is not needed while we have a global lock, taken here only for consistency.) - Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); - - // Ensure that the epoch passed in still matches the real state of the database. - - auto epochCheck = checkCollectionVersionEpoch(txn, nss, migratedChunk, collectionEpoch); - if (!epochCheck.isOK()) { - return epochCheck; - } - - // Check that migratedChunk and controlChunk are where they should be, on fromShard. - - auto migratedOnShard = - checkChunkIsOnShard(txn, nss, migratedChunk.getMin(), migratedChunk.getMax(), fromShard); - if (!migratedOnShard.isOK()) { - return migratedOnShard; - } - - if (controlChunk) { - auto controlOnShard = checkChunkIsOnShard( - txn, nss, controlChunk->getMin(), controlChunk->getMax(), fromShard); - if (!controlOnShard.isOK()) { - return controlOnShard; - } - } - - // Must use local read concern because we will perform subsequent writes. - auto findResponse = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ChunkType::ConfigNS), - BSON("ns" << nss.ns()), - BSON(ChunkType::DEPRECATED_lastmod << -1), - 1); - if (!findResponse.isOK()) { - return findResponse.getStatus(); - } - - std::vector<BSONObj> chunksVector = std::move(findResponse.getValue().docs); - if (chunksVector.empty()) { - return Status(ErrorCodes::Error(40164), - str::stream() << "Tried to find max chunk version for collection '" - << nss.ns() - << ", but found no chunks"); - } - - ChunkVersion currentMaxVersion = - ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); - - // Use the incremented major version of the result returned. - - // Generate the new versions of migratedChunk and controlChunk. - // Migrating chunk's minor version will be 0. - ChunkType newMigratedChunk = migratedChunk; - newMigratedChunk.setVersion( - ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch())); - - // Control chunk's minor version will be 1 (if control chunk is present). - boost::optional<ChunkType> newControlChunk = boost::none; - if (controlChunk) { - newControlChunk = controlChunk.get(); - newControlChunk->setVersion( - ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch())); - } - - auto command = makeCommitChunkApplyOpsCommand( - nss, newMigratedChunk, newControlChunk, fromShard.toString(), toShard.toString()); - - StatusWith<Shard::CommandResponse> applyOpsCommandResponse = - Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - nss.db().toString(), - command, - Shard::RetryPolicy::kIdempotent); - - if (!applyOpsCommandResponse.isOK()) { - return applyOpsCommandResponse.getStatus(); - } - if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) { - return applyOpsCommandResponse.getValue().commandStatus; - } - - BSONObjBuilder result; - newMigratedChunk.getVersion().appendWithFieldForCommands(&result, "migratedChunkVersion"); - if (controlChunk) { - newControlChunk->getVersion().appendWithFieldForCommands(&result, "controlChunkVersion"); - } - return result.obj(); -} - -void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { - _executorForAddShard->appendConnectionStats(stats); -} - Status ShardingCatalogManagerImpl::initializeConfigDatabaseIfNeeded(OperationContext* txn) { { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -1764,13 +120,15 @@ void ShardingCatalogManagerImpl::discardCachedConfigDatabaseInitializationState( } Status ShardingCatalogManagerImpl::_initConfigVersion(OperationContext* txn) { + const auto catalogClient = Grid::get(txn)->catalogClient(txn); + auto versionStatus = - _catalogClient->getConfigVersion(txn, repl::ReadConcernLevel::kLocalReadConcern); + catalogClient->getConfigVersion(txn, repl::ReadConcernLevel::kLocalReadConcern); if (!versionStatus.isOK()) { return versionStatus.getStatus(); } - auto versionInfo = versionStatus.getValue(); + const auto& versionInfo = versionStatus.getValue(); if (versionInfo.getMinCompatibleVersion() > CURRENT_CONFIG_VERSION) { return {ErrorCodes::IncompatibleShardingConfigVersion, str::stream() << "current version v" << CURRENT_CONFIG_VERSION @@ -1785,7 +143,7 @@ Status ShardingCatalogManagerImpl::_initConfigVersion(OperationContext* txn) { newVersion.setCurrentVersion(CURRENT_CONFIG_VERSION); BSONObj versionObj(newVersion.toBSON()); - auto insertStatus = _catalogClient->insertConfigDocument( + auto insertStatus = catalogClient->insertConfigDocument( txn, VersionType::ConfigNS, versionObj, kNoWaitWriteConcern); return insertStatus; @@ -1916,357 +274,6 @@ Status ShardingCatalogManagerImpl::_initConfigIndexes(OperationContext* txn) { return Status::OK(); } -Status ShardingCatalogManagerImpl::initializeShardingAwarenessOnUnawareShards( - OperationContext* txn) { - auto swShards = _getAllShardingUnawareShards(txn); - if (!swShards.isOK()) { - return swShards.getStatus(); - } else { - auto shards = std::move(swShards.getValue()); - for (const auto& shard : shards) { - auto status = upsertShardIdentityOnShard(txn, shard); - if (!status.isOK()) { - return status; - } - } - } - - // Note: this OK status means only that tasks to initialize sharding awareness on the shards - // were scheduled against the task executor, not that the tasks actually succeeded. - return Status::OK(); -} - -StatusWith<std::vector<ShardType>> ShardingCatalogManagerImpl::_getAllShardingUnawareShards( - OperationContext* txn) { - std::vector<ShardType> shards; - auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - txn, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - NamespaceString(ShardType::ConfigNS), - BSON( - "state" << BSON("$ne" << static_cast<std::underlying_type<ShardType::ShardState>::type>( - ShardType::ShardState::kShardAware))), // shard is sharding unaware - BSONObj(), // no sort - boost::none); // no limit - if (!findStatus.isOK()) { - return findStatus.getStatus(); - } - - for (const BSONObj& doc : findStatus.getValue().docs) { - auto shardRes = ShardType::fromBSON(doc); - if (!shardRes.isOK()) { - return {ErrorCodes::FailedToParse, - stream() << "Failed to parse shard " << causedBy(shardRes.getStatus()) << doc}; - } - - Status validateStatus = shardRes.getValue().validate(); - if (!validateStatus.isOK()) { - return {validateStatus.code(), - stream() << "Failed to validate shard " << causedBy(validateStatus) << doc}; - } - - shards.push_back(shardRes.getValue()); - } - - return shards; -} - -Status ShardingCatalogManagerImpl::upsertShardIdentityOnShard(OperationContext* txn, - ShardType shardType) { - - auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName()); - - auto swConnString = ConnectionString::parse(shardType.getHost()); - if (!swConnString.isOK()) { - return swConnString.getStatus(); - } - - // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter - // instead. - const std::shared_ptr<Shard> shard{ - Grid::get(txn)->shardRegistry()->createConnection(swConnString.getValue())}; - invariant(shard); - auto targeter = shard->getTargeter(); - - _scheduleAddShardTask( - std::move(shardType), std::move(targeter), std::move(commandRequest), false); - - return Status::OK(); -} - -void ShardingCatalogManagerImpl::cancelAddShardTaskIfNeeded(const ShardId& shardId) { - stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); - if (_hasAddShardHandle_inlock(shardId)) { - auto cbHandle = _getAddShardHandle_inlock(shardId); - _executorForAddShard->cancel(cbHandle); - // Untrack the handle here so that if this shard is re-added before the CallbackCanceled - // status is delivered to the callback, a new addShard task for the shard will be - // created. - _untrackAddShardHandle_inlock(shardId); - } -} - -void ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled( - const CallbackArgs& cbArgs, - const ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter, - const BSONObj commandRequest) { - if (cbArgs.status == ErrorCodes::CallbackCanceled) { - return; - } - _scheduleAddShardTask( - std::move(shardType), std::move(targeter), std::move(commandRequest), true); -} - -void ShardingCatalogManagerImpl::_scheduleAddShardTask( - const ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter, - const BSONObj commandRequest, - const bool isRetry) { - stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); - - if (isRetry) { - // Untrack the handle from scheduleWorkAt, and schedule a new addShard task. - _untrackAddShardHandle_inlock(shardType.getName()); - } else { - // We should never be able to schedule an addShard task while one is running, because - // there is a unique index on the _id field in config.shards. - invariant(!_hasAddShardHandle_inlock(shardType.getName())); - } - - // Schedule the shardIdentity upsert request to run immediately, and track the handle. - - auto swHost = targeter->findHostWithMaxWait(ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - Milliseconds(kDefaultFindHostMaxWaitTime)); - if (!swHost.isOK()) { - // A 3.2 mongos must have previously successfully communicated with hosts in this shard, - // so a failure to find a host here is probably transient, and it is safe to retry. - warning() << "Failed to find host for shard " << shardType - << " when trying to upsert a shardIdentity document, " - << causedBy(swHost.getStatus()); - const Date_t now = _executorForAddShard->now(); - const Date_t when = now + getAddShardTaskRetryInterval(); - _trackAddShardHandle_inlock( - shardType.getName(), - _executorForAddShard->scheduleWorkAt( - when, - stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled, - this, - stdx::placeholders::_1, - shardType, - std::move(targeter), - std::move(commandRequest)))); - return; - } - - executor::RemoteCommandRequest request( - swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); - - const RemoteCommandCallbackFn callback = - stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse, - this, - stdx::placeholders::_1, - shardType, - std::move(targeter)); - - if (isRetry) { - log() << "Retrying upsert of shardIdentity document into shard " << shardType.getName(); - } - _trackAddShardHandle_inlock(shardType.getName(), - _executorForAddShard->scheduleRemoteCommand(request, callback)); -} - -void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( - const RemoteCommandCallbackArgs& cbArgs, - ShardType shardType, - std::shared_ptr<RemoteCommandTargeter> targeter) { - stdx::unique_lock<stdx::mutex> lk(_addShardHandlesMutex); - - // If the callback has been canceled (either due to shutdown or the shard being removed), we - // do not need to reschedule the task or update config.shards. - Status responseStatus = cbArgs.response.status; - if (responseStatus == ErrorCodes::CallbackCanceled) { - return; - } - - // If the handle no longer exists, the shard must have been removed, but the callback must not - // have been canceled until after the task had completed. In this case as well, we do not need - // to reschedule the task or update config.shards. - if (!_hasAddShardHandle_inlock(shardType.getName())) { - return; - } - - // Untrack the handle from scheduleRemoteCommand regardless of whether the command - // succeeded. If it failed, we will track the handle for the rescheduled task before - // releasing the mutex. - _untrackAddShardHandle_inlock(shardType.getName()); - - // Examine the response to determine if the upsert succeeded. - - bool rescheduleTask = false; - - auto swResponse = cbArgs.response; - if (!swResponse.isOK()) { - warning() << "Failed to upsert shardIdentity document during addShard into shard " - << shardType.getName() << "(" << shardType.getHost() - << "). The shardIdentity upsert will continue to be retried. " - << causedBy(swResponse.status); - rescheduleTask = true; - } else { - // Create a CommandResponse object in order to use processBatchWriteResponse. - BSONObj responseObj = swResponse.data.getOwned(); - BSONObj responseMetadata = swResponse.metadata.getOwned(); - Status commandStatus = getStatusFromCommandResult(responseObj); - Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); - Shard::CommandResponse commandResponse(std::move(responseObj), - std::move(responseMetadata), - std::move(commandStatus), - std::move(writeConcernStatus)); - - BatchedCommandResponse batchResponse; - auto batchResponseStatus = - Shard::CommandResponse::processBatchWriteResponse(commandResponse, &batchResponse); - if (!batchResponseStatus.isOK()) { - if (batchResponseStatus == ErrorCodes::DuplicateKey) { - warning() - << "Received duplicate key error when inserting the shardIdentity " - "document into " - << shardType.getName() << "(" << shardType.getHost() - << "). This means the shard has a shardIdentity document with a clusterId " - "that differs from this cluster's clusterId. It may still belong to " - "or not have been properly removed from another cluster. The " - "shardIdentity upsert will continue to be retried."; - } else { - warning() << "Failed to upsert shardIdentity document into shard " - << shardType.getName() << "(" << shardType.getHost() - << ") during addShard. The shardIdentity upsert will continue to be " - "retried. " - << causedBy(batchResponseStatus); - } - rescheduleTask = true; - } - } - - if (rescheduleTask) { - // If the command did not succeed, schedule the upsert shardIdentity task again with a - // delay. - const Date_t now = _executorForAddShard->now(); - const Date_t when = now + getAddShardTaskRetryInterval(); - - // Track the handle from scheduleWorkAt. - _trackAddShardHandle_inlock( - shardType.getName(), - _executorForAddShard->scheduleWorkAt( - when, - stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled, - this, - stdx::placeholders::_1, - shardType, - std::move(targeter), - std::move(cbArgs.request.cmdObj)))); - return; - } - - // If the command succeeded, update config.shards to mark the shard as shardAware. - - // Release the _addShardHandlesMutex before updating config.shards, since it involves disk - // I/O. - // At worst, a redundant addShard task will be scheduled by a new primary if the current - // primary fails during that write. - lk.unlock(); - - // This thread is part of a thread pool owned by the addShard TaskExecutor. Threads in that - // pool are not created with Client objects associated with them, so a Client is created and - // attached here to do the local update. The Client is destroyed at the end of the scope, - // leaving the thread state as it was before. - Client::initThread(getThreadName().c_str()); - ON_BLOCK_EXIT([&] { Client::destroy(); }); - - // Use the thread's Client to create an OperationContext to perform the local write to - // config.shards. This OperationContext will automatically be destroyed when it goes out of - // scope at the end of this code block. - auto txnPtr = cc().makeOperationContext(); - - // Use kNoWaitWriteConcern to prevent waiting in this callback, since we don't handle a - // failed response anyway. If the write is rolled back, the new config primary will attempt to - // initialize sharding awareness on this shard again, and this update to config.shards will - // be automatically retried then. If it fails because the shard was removed through the normal - // removeShard path (so the entry in config.shards was deleted), no new addShard task will - // get scheduled on the next transition to primary. - auto updateStatus = _catalogClient->updateConfigDocument( - txnPtr.get(), - ShardType::ConfigNS, - BSON(ShardType::name(shardType.getName())), - BSON("$set" << BSON(ShardType::state() - << static_cast<std::underlying_type<ShardType::ShardState>::type>( - ShardType::ShardState::kShardAware))), - false, - kNoWaitWriteConcern); - - if (!updateStatus.isOK()) { - warning() << "Failed to mark shard " << shardType.getName() << "(" << shardType.getHost() - << ") as shardAware in config.shards. This will be retried the next time a " - "config server transitions to primary. " - << causedBy(updateStatus.getStatus()); - } -} - -BSONObj ShardingCatalogManagerImpl::createShardIdentityUpsertForAddShard( - OperationContext* txn, const std::string& shardName) { - std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); - - BSONObjBuilder query; - query.append("_id", "shardIdentity"); - query.append(ShardIdentityType::shardName(), shardName); - query.append(ShardIdentityType::clusterId(), ClusterIdentityLoader::get(txn)->getClusterId()); - updateDoc->setQuery(query.obj()); - - BSONObjBuilder update; - { - BSONObjBuilder set(update.subobjStart("$set")); - set.append(ShardIdentityType::configsvrConnString(), - Grid::get(txn)->shardRegistry()->getConfigServerConnectionString().toString()); - } - updateDoc->setUpdateExpr(update.obj()); - updateDoc->setUpsert(true); - - std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest()); - updateRequest->addToUpdates(updateDoc.release()); - - BatchedCommandRequest commandRequest(updateRequest.release()); - commandRequest.setNS(NamespaceString::kConfigCollectionNamespace); - commandRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - - return commandRequest.toBSON(); -} - - -bool ShardingCatalogManagerImpl::_hasAddShardHandle_inlock(const ShardId& shardId) { - return _addShardHandles.find(shardId) != _addShardHandles.end(); -} - -const CallbackHandle& ShardingCatalogManagerImpl::_getAddShardHandle_inlock( - const ShardId& shardId) { - invariant(_hasAddShardHandle_inlock(shardId)); - return _addShardHandles.find(shardId)->second; -} - -void ShardingCatalogManagerImpl::_trackAddShardHandle_inlock( - const ShardId shardId, const StatusWith<CallbackHandle>& swHandle) { - if (swHandle.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(40219, swHandle.getStatus()); - _addShardHandles.insert(std::pair<ShardId, CallbackHandle>(shardId, swHandle.getValue())); -} - -void ShardingCatalogManagerImpl::_untrackAddShardHandle_inlock(const ShardId& shardId) { - auto it = _addShardHandles.find(shardId); - invariant(it != _addShardHandles.end()); - _addShardHandles.erase(shardId); -} - Status ShardingCatalogManagerImpl::setFeatureCompatibilityVersionOnShards( OperationContext* txn, const std::string& version) { @@ -2274,9 +281,9 @@ Status ShardingCatalogManagerImpl::setFeatureCompatibilityVersionOnShards( Lock::SharedLock lk(txn->lockState(), _kShardMembershipLock); std::vector<ShardId> shardIds; - grid.shardRegistry()->getAllShardIds(&shardIds); + Grid::get(txn)->shardRegistry()->getAllShardIds(&shardIds); for (const ShardId& shardId : shardIds) { - const auto shardStatus = grid.shardRegistry()->getShard(txn, shardId); + const auto shardStatus = Grid::get(txn)->shardRegistry()->getShard(txn, shardId); if (!shardStatus.isOK()) { continue; } diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/sharding_catalog_manager_impl.h index 744bc30b31f..5d81313446c 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.h @@ -28,36 +28,22 @@ #pragma once -#include <vector> - #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/executor/task_executor.h" #include "mongo/s/catalog/sharding_catalog_manager.h" -#include "mongo/s/catalog/type_chunk.h" -#include "mongo/s/client/shard_registry.h" +#include "mongo/s/client/shard.h" #include "mongo/stdx/mutex.h" namespace mongo { -class DatabaseType; class RemoteCommandTargeter; -class ShardingCatalogClient; -class VersionType; -class ShardId; -template <typename T> -class StatusWith; - -namespace executor { -class TaskExecutor; -} // namespace executor /** * Implements the catalog manager for writing to replica set config servers. */ class ShardingCatalogManagerImpl final : public ShardingCatalogManager { public: - ShardingCatalogManagerImpl(ShardingCatalogClient* catalogClient, - std::unique_ptr<executor::TaskExecutor> addShardExecutor); + ShardingCatalogManagerImpl(std::unique_ptr<executor::TaskExecutor> addShardExecutor); virtual ~ShardingCatalogManagerImpl(); /** @@ -68,10 +54,9 @@ public: void shutDown(OperationContext* txn) override; - StatusWith<std::string> addShard(OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) override; + Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; + + void discardCachedConfigDatabaseInitializationState() override; Status addShardToZone(OperationContext* txn, const std::string& shardName, @@ -113,15 +98,15 @@ public: void appendConnectionStats(executor::ConnectionPoolStats* stats) override; - Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; - - void discardCachedConfigDatabaseInitializationState() override; + StatusWith<std::string> addShard(OperationContext* txn, + const std::string* shardProposedName, + const ConnectionString& shardConnectionString, + const long long maxSize) override; Status initializeShardingAwarenessOnUnawareShards(OperationContext* txn) override; Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) override; - BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn, const std::string& shardName) override; @@ -132,9 +117,15 @@ public: private: /** - * Generates a unique name to be given to a newly added shard. + * Performs the necessary checks for version compatibility and creates a new config.version + * document if the current cluster config is empty. + */ + Status _initConfigVersion(OperationContext* txn); + + /** + * Builds all the expected indexes on the config server. */ - StatusWith<std::string> _generateNewShardName(OperationContext* txn); + Status _initConfigIndexes(OperationContext* txn); /** * Used during addShard to determine if there is already an existing shard that matches the @@ -190,17 +181,6 @@ private: const BSONObj& cmdObj); /** - * Returns the current cluster schema/protocol version. - */ - StatusWith<VersionType> _getConfigVersion(OperationContext* txn); - - /** - * Performs the necessary checks for version compatibility and creates a new config.version - * document if the current cluster config is empty. - */ - Status _initConfigVersion(OperationContext* txn); - - /** * Retrieves all shards that are not marked as sharding aware (state = 1) in this cluster. */ StatusWith<std::vector<ShardType>> _getAllShardingUnawareShards(OperationContext* txn); @@ -265,11 +245,6 @@ private: */ void _untrackAddShardHandle_inlock(const ShardId& shardId); - /** - * Builds all the expected indexes on the config server. - */ - Status _initConfigIndexes(OperationContext* txn); - // // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. @@ -281,24 +256,19 @@ private: stdx::mutex _mutex; - // Pointer to the ShardingCatalogClient that can be used to read config server data. - // This pointer is not owned, so it is important that the object it points to continues to be - // valid for the lifetime of this ShardingCatalogManager. - ShardingCatalogClient* _catalogClient; // (R) - - // Executor specifically used for sending commands to servers that are in the process of being - // added as shards. Does not have any connection hook set on it, thus it can be used to talk - // to servers that are not yet in the ShardRegistry. - std::unique_ptr<executor::TaskExecutor> _executorForAddShard; // (R) - // True if shutDown() has been called. False, otherwise. - bool _inShutdown = false; // (M) + bool _inShutdown{false}; // (M) // True if startup() has been called. - bool _started = false; // (M) + bool _started{false}; // (M) // True if initializeConfigDatabaseIfNeeded() has been called and returned successfully. - bool _configInitialized = false; // (M) + bool _configInitialized{false}; // (M) + + // Executor specifically used for sending commands to servers that are in the process of being + // added as shards. Does not have any connection hook set on it, thus it can be used to talk + // to servers that are not yet in the ShardRegistry. + std::unique_ptr<executor::TaskExecutor> _executorForAddShard; // (R) // For rolling upgrade and backwards compatibility with 3.2 mongos, maintains a mapping of // a shardId to an outstanding addShard task scheduled against the _executorForAddShard. diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp deleted file mode 100644 index 44a0e884bb8..00000000000 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/s/catalog/sharding_catalog_manager_mock.h" - -#include "mongo/base/status.h" -#include "mongo/base/status_with.h" - -namespace mongo { - -using std::string; -using std::vector; - -ShardingCatalogManagerMock::ShardingCatalogManagerMock() = default; - -ShardingCatalogManagerMock::~ShardingCatalogManagerMock() = default; - -Status ShardingCatalogManagerMock::startup() { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -void ShardingCatalogManagerMock::shutDown(OperationContext* txn) {} - -StatusWith<string> ShardingCatalogManagerMock::addShard( - OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -Status ShardingCatalogManagerMock::addShardToZone(OperationContext* txn, - const std::string& shardName, - const std::string& zoneName) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -Status ShardingCatalogManagerMock::removeShardFromZone(OperationContext* txn, - const std::string& shardName, - const std::string& zoneName) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -Status ShardingCatalogManagerMock::assignKeyRangeToZone(OperationContext* txn, - const NamespaceString& ns, - const ChunkRange& range, - const std::string& zoneName) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -Status ShardingCatalogManagerMock::removeKeyRangeFromZone(OperationContext* txn, - const NamespaceString& ns, - const ChunkRange& range) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -Status ShardingCatalogManagerMock::commitChunkSplit(OperationContext* txn, - const NamespaceString& ns, - const OID& requestEpoch, - const ChunkRange& range, - const std::vector<BSONObj>& splitPoints, - const std::string& shardName) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -Status ShardingCatalogManagerMock::commitChunkMerge(OperationContext* txn, - const NamespaceString& ns, - const OID& requestEpoch, - const std::vector<BSONObj>& chunkBoundaries, - const std::string& shardName) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -StatusWith<BSONObj> ShardingCatalogManagerMock::commitChunkMigration( - OperationContext* txn, - const NamespaceString&, - const ChunkType&, - const boost::optional<ChunkType>&, - const OID& collectionEpoch, - const ShardId&, - const ShardId&) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {} - -Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationContext* txn) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -void ShardingCatalogManagerMock::discardCachedConfigDatabaseInitializationState() {} - -Status ShardingCatalogManagerMock::initializeShardingAwarenessOnUnawareShards( - OperationContext* txn) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -Status ShardingCatalogManagerMock::upsertShardIdentityOnShard(OperationContext* txn, - ShardType shardType) { - return {ErrorCodes::InternalError, "Method not implemented"}; -} - -BSONObj ShardingCatalogManagerMock::createShardIdentityUpsertForAddShard( - OperationContext* txn, const std::string& shardName) { - MONGO_UNREACHABLE; -} - -void ShardingCatalogManagerMock::cancelAddShardTaskIfNeeded(const ShardId& shardId) { - MONGO_UNREACHABLE; -} - -Status ShardingCatalogManagerMock::setFeatureCompatibilityVersionOnShards( - OperationContext* txn, const std::string& version) { - MONGO_UNREACHABLE; -} - -} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h deleted file mode 100644 index 55d121fe31e..00000000000 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <boost/optional.hpp> - -#include "mongo/base/status_with.h" -#include "mongo/bson/bsonobj.h" -#include "mongo/db/s/type_shard_identity.h" -#include "mongo/s/catalog/sharding_catalog_manager.h" -#include "mongo/s/catalog/type_shard.h" - -namespace mongo { - -/** - * A dummy implementation of ShardingCatalogManager for testing purposes. - */ -class ShardingCatalogManagerMock : public ShardingCatalogManager { -public: - ShardingCatalogManagerMock(); - ~ShardingCatalogManagerMock(); - - Status startup() override; - - void shutDown(OperationContext* txn) override; - - StatusWith<std::string> addShard(OperationContext* txn, - const std::string* shardProposedName, - const ConnectionString& shardConnectionString, - const long long maxSize) override; - - Status addShardToZone(OperationContext* txn, - const std::string& shardName, - const std::string& zoneName) override; - - Status removeShardFromZone(OperationContext* txn, - const std::string& shardName, - const std::string& zoneName) override; - - Status assignKeyRangeToZone(OperationContext* txn, - const NamespaceString& ns, - const ChunkRange& range, - const std::string& zoneName) override; - - Status removeKeyRangeFromZone(OperationContext* txn, - const NamespaceString& ns, - const ChunkRange& range) override; - - Status commitChunkSplit(OperationContext* txn, - const NamespaceString& ns, - const OID& requestEpoch, - const ChunkRange& range, - const std::vector<BSONObj>& splitPoints, - const std::string& shardName) override; - - Status commitChunkMerge(OperationContext* txn, - const NamespaceString& ns, - const OID& requestEpoch, - const std::vector<BSONObj>& chunkBoundaries, - const std::string& shardName) override; - - StatusWith<BSONObj> commitChunkMigration(OperationContext* txn, - const NamespaceString& nss, - const ChunkType& migratedChunk, - const boost::optional<ChunkType>& controlChunk, - const OID& collectionEpoch, - const ShardId& fromShard, - const ShardId& toShard) override; - - void appendConnectionStats(executor::ConnectionPoolStats* stats) override; - - Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; - - void discardCachedConfigDatabaseInitializationState() override; - - Status initializeShardingAwarenessOnUnawareShards(OperationContext* txn) override; - - Status upsertShardIdentityOnShard(OperationContext* txn, ShardType shardType) override; - - BSONObj createShardIdentityUpsertForAddShard(OperationContext* txn, - const std::string& shardName) override; - - void cancelAddShardTaskIfNeeded(const ShardId& shardId) override; - - Status setFeatureCompatibilityVersionOnShards(OperationContext* txn, - const std::string& version) override; -}; - -} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager_shard_operations_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations_impl.cpp new file mode 100644 index 00000000000..3a7aef9c9df --- /dev/null +++ b/src/mongo/s/catalog/sharding_catalog_manager_shard_operations_impl.cpp @@ -0,0 +1,1063 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/sharding_catalog_manager_impl.h" + +#include <iomanip> +#include <set> + +#include "mongo/base/status_with.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/client/connection_string.h" +#include "mongo/client/read_preference.h" +#include "mongo/client/remote_command_targeter.h" +#include "mongo/client/replica_set_monitor.h" +#include "mongo/db/client.h" +#include "mongo/db/commands/feature_compatibility_version.h" +#include "mongo/db/commands/feature_compatibility_version_command_parser.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/type_shard_identity.h" +#include "mongo/db/wire_version.h" +#include "mongo/executor/task_executor.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/catalog/config_server_version.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_database.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_identity_loader.h" +#include "mongo/s/grid.h" +#include "mongo/s/write_ops/batched_command_request.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace { + +using CallbackHandle = executor::TaskExecutor::CallbackHandle; +using CallbackArgs = executor::TaskExecutor::CallbackArgs; +using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; +using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; + +const Seconds kDefaultFindHostMaxWaitTime(20); + +const ReadPreferenceSetting kConfigReadSelector(ReadPreference::Nearest, TagSet{}); +const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); + +MONGO_FP_DECLARE(dontUpsertShardIdentityOnNewShards); + +/** + * Generates a unique name to be given to a newly added shard. + */ +StatusWith<std::string> generateNewShardName(OperationContext* txn) { + BSONObjBuilder shardNameRegex; + shardNameRegex.appendRegex(ShardType::name(), "^shard"); + + auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + kConfigReadSelector, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString(ShardType::ConfigNS), + shardNameRegex.obj(), + BSON(ShardType::name() << -1), + 1); + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& docs = findStatus.getValue().docs; + + int count = 0; + if (!docs.empty()) { + const auto shardStatus = ShardType::fromBSON(docs.front()); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); + } + + std::istringstream is(shardStatus.getValue().getName().substr(5)); + is >> count; + count++; + } + + // TODO: fix so that we can have more than 10000 automatically generated shard names + if (count < 9999) { + std::stringstream ss; + ss << "shard" << std::setfill('0') << std::setw(4) << count; + return ss.str(); + } + + return Status(ErrorCodes::OperationFailed, "unable to generate new shard name"); +} + +} // namespace + +StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAddShard( + OperationContext* txn, + RemoteCommandTargeter* targeter, + const std::string& dbName, + const BSONObj& cmdObj) { + auto host = targeter->findHost(txn, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); + if (!host.isOK()) { + return host.getStatus(); + } + + executor::RemoteCommandRequest request( + host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); + executor::RemoteCommandResponse swResponse = + Status(ErrorCodes::InternalError, "Internal error running command"); + + auto callStatus = _executorForAddShard->scheduleRemoteCommand( + request, [&swResponse](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { + swResponse = args.response; + }); + if (!callStatus.isOK()) { + return callStatus.getStatus(); + } + + // Block until the command is carried out + _executorForAddShard->wait(callStatus.getValue()); + + if (!swResponse.isOK()) { + if (swResponse.status.compareCode(ErrorCodes::ExceededTimeLimit)) { + LOG(0) << "Operation for addShard timed out with status " << swResponse.status; + } + if (!Shard::shouldErrorBePropagated(swResponse.status.code())) { + swResponse.status = {ErrorCodes::OperationFailed, + str::stream() << "failed to run command " << cmdObj + << " when attempting to add shard " + << targeter->connectionString().toString() + << causedBy(swResponse.status)}; + } + return swResponse.status; + } + + BSONObj responseObj = swResponse.data.getOwned(); + BSONObj responseMetadata = swResponse.metadata.getOwned(); + + Status commandStatus = getStatusFromCommandResult(responseObj); + if (!Shard::shouldErrorBePropagated(commandStatus.code())) { + commandStatus = {ErrorCodes::OperationFailed, + str::stream() << "failed to run command " << cmdObj + << " when attempting to add shard " + << targeter->connectionString().toString() + << causedBy(commandStatus)}; + } + + Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); + if (!Shard::shouldErrorBePropagated(writeConcernStatus.code())) { + writeConcernStatus = {ErrorCodes::OperationFailed, + str::stream() << "failed to satisfy writeConcern for command " + << cmdObj + << " when attempting to add shard " + << targeter->connectionString().toString() + << causedBy(writeConcernStatus)}; + } + + return Shard::CommandResponse(std::move(responseObj), + std::move(responseMetadata), + std::move(commandStatus), + std::move(writeConcernStatus)); +} + +StatusWith<boost::optional<ShardType>> ShardingCatalogManagerImpl::_checkIfShardExists( + OperationContext* txn, + const ConnectionString& proposedShardConnectionString, + const std::string* proposedShardName, + long long proposedShardMaxSize) { + // Check whether any host in the connection is already part of the cluster. + const auto existingShards = Grid::get(txn)->catalogClient(txn)->getAllShards( + txn, repl::ReadConcernLevel::kLocalReadConcern); + if (!existingShards.isOK()) { + return Status(existingShards.getStatus().code(), + str::stream() << "Failed to load existing shards during addShard" + << causedBy(existingShards.getStatus().reason())); + } + + // Now check if this shard already exists - if it already exists *with the same options* then + // the addShard request can return success early without doing anything more. + for (const auto& existingShard : existingShards.getValue().value) { + auto swExistingShardConnStr = ConnectionString::parse(existingShard.getHost()); + if (!swExistingShardConnStr.isOK()) { + return swExistingShardConnStr.getStatus(); + } + auto existingShardConnStr = std::move(swExistingShardConnStr.getValue()); + + // Function for determining if the options for the shard that is being added match the + // options of an existing shard that conflicts with it. + auto shardsAreEquivalent = [&]() { + if (proposedShardName && *proposedShardName != existingShard.getName()) { + return false; + } + if (proposedShardConnectionString.type() != existingShardConnStr.type()) { + return false; + } + if (proposedShardConnectionString.type() == ConnectionString::SET && + proposedShardConnectionString.getSetName() != existingShardConnStr.getSetName()) { + return false; + } + if (proposedShardMaxSize != existingShard.getMaxSizeMB()) { + return false; + } + return true; + }; + + if (existingShardConnStr.type() == ConnectionString::SET && + proposedShardConnectionString.type() == ConnectionString::SET && + existingShardConnStr.getSetName() == proposedShardConnectionString.getSetName()) { + // An existing shard has the same replica set name as the shard being added. + // If the options aren't the same, then this is an error, + // but if the options match then the addShard operation should be immediately + // considered a success and terminated. + if (shardsAreEquivalent()) { + return {existingShard}; + } else { + return {ErrorCodes::IllegalOperation, + str::stream() << "A shard already exists containing the replica set '" + << existingShardConnStr.getSetName() + << "'"}; + } + } + + for (const auto& existingHost : existingShardConnStr.getServers()) { + // Look if any of the hosts in the existing shard are present within the shard trying + // to be added. + for (const auto& addingHost : proposedShardConnectionString.getServers()) { + if (existingHost == addingHost) { + // At least one of the hosts in the shard being added already exists in an + // existing shard. If the options aren't the same, then this is an error, + // but if the options match then the addShard operation should be immediately + // considered a success and terminated. + if (shardsAreEquivalent()) { + return {existingShard}; + } else { + return {ErrorCodes::IllegalOperation, + str::stream() << "'" << addingHost.toString() << "' " + << "is already a member of the existing shard '" + << existingShard.getHost() + << "' (" + << existingShard.getName() + << ")."}; + } + } + } + } + + if (proposedShardName && *proposedShardName == existingShard.getName()) { + // If we get here then we're trying to add a shard with the same name as an existing + // shard, but there was no overlap in the hosts between the existing shard and the + // proposed connection string for the new shard. + return {ErrorCodes::IllegalOperation, + str::stream() << "A shard named " << *proposedShardName << " already exists"}; + } + } + + return {boost::none}; +} + +StatusWith<ShardType> ShardingCatalogManagerImpl::_validateHostAsShard( + OperationContext* txn, + std::shared_ptr<RemoteCommandTargeter> targeter, + const std::string* shardProposedName, + const ConnectionString& connectionString) { + + // Check if the node being added is a mongos or a version of mongod too old to speak the current + // communication protocol. + auto swCommandResponse = + _runCommandForAddShard(txn, targeter.get(), "admin", BSON("isMaster" << 1)); + if (!swCommandResponse.isOK()) { + if (swCommandResponse.getStatus() == ErrorCodes::RPCProtocolNegotiationFailed) { + // Mongos to mongos commands are no longer supported in the wire protocol + // (because mongos does not support OP_COMMAND), similarly for a new mongos + // and an old mongod. So the call will fail in such cases. + // TODO: If/When mongos ever supports opCommands, this logic will break because + // cmdStatus will be OK. + return {ErrorCodes::RPCProtocolNegotiationFailed, + str::stream() << targeter->connectionString().toString() + << " does not recognize the RPC protocol being used. This is" + << " likely because it contains a node that is a mongos or an old" + << " version of mongod."}; + } else { + return swCommandResponse.getStatus(); + } + } + + // Check for a command response error + auto resIsMasterStatus = std::move(swCommandResponse.getValue().commandStatus); + if (!resIsMasterStatus.isOK()) { + return {resIsMasterStatus.code(), + str::stream() << "Error running isMaster against " + << targeter->connectionString().toString() + << ": " + << causedBy(resIsMasterStatus)}; + } + + auto resIsMaster = std::move(swCommandResponse.getValue().response); + + // Check that the node being added is a new enough version. + // If we're running this code, that means the mongos that the addShard request originated from + // must be at least version 3.4 (since 3.2 mongoses don't know about the _configsvrAddShard + // command). Since it is illegal to have v3.4 mongoses with v3.2 shards, we should reject + // adding any shards that are not v3.4. We can determine this by checking that the + // maxWireVersion reported in isMaster is at least COMMANDS_ACCEPT_WRITE_CONCERN. + // TODO(SERVER-25623): This approach won't work to prevent v3.6 mongoses from adding v3.4 + // shards, so we'll have to rethink this during the 3.5 development cycle. + + long long maxWireVersion; + Status status = bsonExtractIntegerField(resIsMaster, "maxWireVersion", &maxWireVersion); + if (!status.isOK()) { + return Status(status.code(), + str::stream() << "isMaster returned invalid 'maxWireVersion' " + << "field when attempting to add " + << connectionString.toString() + << " as a shard: " + << status.reason()); + } + if (maxWireVersion < WireVersion::COMMANDS_ACCEPT_WRITE_CONCERN) { + return Status(ErrorCodes::IncompatibleServerVersion, + str::stream() << "Cannot add " << connectionString.toString() + << " as a shard because we detected a mongod with server " + "version older than 3.4.0. It is invalid to add v3.2 and " + "older shards through a v3.4 mongos."); + } + + + // Check whether there is a master. If there isn't, the replica set may not have been + // initiated. If the connection is a standalone, it will return true for isMaster. + bool isMaster; + status = bsonExtractBooleanField(resIsMaster, "ismaster", &isMaster); + if (!status.isOK()) { + return Status(status.code(), + str::stream() << "isMaster returned invalid 'ismaster' " + << "field when attempting to add " + << connectionString.toString() + << " as a shard: " + << status.reason()); + } + if (!isMaster) { + return {ErrorCodes::NotMaster, + str::stream() + << connectionString.toString() + << " does not have a master. If this is a replica set, ensure that it has a" + << " healthy primary and that the set has been properly initiated."}; + } + + const std::string providedSetName = connectionString.getSetName(); + const std::string foundSetName = resIsMaster["setName"].str(); + + // Make sure the specified replica set name (if any) matches the actual shard's replica set + if (providedSetName.empty() && !foundSetName.empty()) { + return {ErrorCodes::OperationFailed, + str::stream() << "host is part of set " << foundSetName << "; " + << "use replica set url format " + << "<setname>/<server1>,<server2>, ..."}; + } + + if (!providedSetName.empty() && foundSetName.empty()) { + return {ErrorCodes::OperationFailed, + str::stream() << "host did not return a set name; " + << "is the replica set still initializing? " + << resIsMaster}; + } + + // Make sure the set name specified in the connection string matches the one where its hosts + // belong into + if (!providedSetName.empty() && (providedSetName != foundSetName)) { + return {ErrorCodes::OperationFailed, + str::stream() << "the provided connection string (" << connectionString.toString() + << ") does not match the actual set name " + << foundSetName}; + } + + // Is it a config server? + if (resIsMaster.hasField("configsvr")) { + return {ErrorCodes::OperationFailed, + str::stream() << "Cannot add " << connectionString.toString() + << " as a shard since it is a config server"}; + } + + // If the shard is part of a replica set, make sure all the hosts mentioned in the connection + // string are part of the set. It is fine if not all members of the set are mentioned in the + // connection string, though. + if (!providedSetName.empty()) { + std::set<std::string> hostSet; + + BSONObjIterator iter(resIsMaster["hosts"].Obj()); + while (iter.more()) { + hostSet.insert(iter.next().String()); // host:port + } + + if (resIsMaster["passives"].isABSONObj()) { + BSONObjIterator piter(resIsMaster["passives"].Obj()); + while (piter.more()) { + hostSet.insert(piter.next().String()); // host:port + } + } + + if (resIsMaster["arbiters"].isABSONObj()) { + BSONObjIterator piter(resIsMaster["arbiters"].Obj()); + while (piter.more()) { + hostSet.insert(piter.next().String()); // host:port + } + } + + for (const auto& hostEntry : connectionString.getServers()) { + const auto& host = hostEntry.toString(); // host:port + if (hostSet.find(host) == hostSet.end()) { + return {ErrorCodes::OperationFailed, + str::stream() << "in seed list " << connectionString.toString() << ", host " + << host + << " does not belong to replica set " + << foundSetName + << "; found " + << resIsMaster.toString()}; + } + } + } + + std::string actualShardName; + + if (shardProposedName) { + actualShardName = *shardProposedName; + } else if (!foundSetName.empty()) { + // Default it to the name of the replica set + actualShardName = foundSetName; + } + + // Disallow adding shard replica set with name 'config' + if (actualShardName == NamespaceString::kConfigDb) { + return {ErrorCodes::BadValue, "use of shard replica set with name 'config' is not allowed"}; + } + + // Retrieve the most up to date connection string that we know from the replica set monitor (if + // this is a replica set shard, otherwise it will be the same value as connectionString). + ConnectionString actualShardConnStr = targeter->connectionString(); + + ShardType shard; + shard.setName(actualShardName); + shard.setHost(actualShardConnStr.toString()); + shard.setState(ShardType::ShardState::kShardAware); + + return shard; +} + +StatusWith<std::vector<std::string>> ShardingCatalogManagerImpl::_getDBNamesListFromShard( + OperationContext* txn, std::shared_ptr<RemoteCommandTargeter> targeter) { + + auto swCommandResponse = + _runCommandForAddShard(txn, targeter.get(), "admin", BSON("listDatabases" << 1)); + if (!swCommandResponse.isOK()) { + return swCommandResponse.getStatus(); + } + + auto cmdStatus = std::move(swCommandResponse.getValue().commandStatus); + if (!cmdStatus.isOK()) { + return cmdStatus; + } + + auto cmdResult = std::move(swCommandResponse.getValue().response); + + std::vector<std::string> dbNames; + + for (const auto& dbEntry : cmdResult["databases"].Obj()) { + const auto& dbName = dbEntry["name"].String(); + + if (!(dbName == NamespaceString::kAdminDb || dbName == NamespaceString::kLocalDb)) { + dbNames.push_back(dbName); + } + } + + return dbNames; +} + +StatusWith<std::string> ShardingCatalogManagerImpl::addShard( + OperationContext* txn, + const std::string* shardProposedName, + const ConnectionString& shardConnectionString, + const long long maxSize) { + if (shardConnectionString.type() == ConnectionString::INVALID) { + return {ErrorCodes::BadValue, "Invalid connection string"}; + } + + if (shardProposedName && shardProposedName->empty()) { + return {ErrorCodes::BadValue, "shard name cannot be empty"}; + } + + // Only one addShard operation can be in progress at a time. + Lock::ExclusiveLock lk(txn->lockState(), _kShardMembershipLock); + + // Check if this shard has already been added (can happen in the case of a retry after a network + // error, for example) and thus this addShard request should be considered a no-op. + auto existingShard = + _checkIfShardExists(txn, shardConnectionString, shardProposedName, maxSize); + if (!existingShard.isOK()) { + return existingShard.getStatus(); + } + if (existingShard.getValue()) { + // These hosts already belong to an existing shard, so report success and terminate the + // addShard request. Make sure to set the last optime for the client to the system last + // optime so that we'll still wait for replication so that this state is visible in the + // committed snapshot. + repl::ReplClientInfo::forClient(txn->getClient()).setLastOpToSystemLastOpTime(txn); + return existingShard.getValue()->getName(); + } + + // Force a reload of the ShardRegistry to ensure that, in case this addShard is to re-add a + // replica set that has recently been removed, we have detached the ReplicaSetMonitor for the + // set with that setName from the ReplicaSetMonitorManager and will create a new + // ReplicaSetMonitor when targeting the set below. + // Note: This is necessary because as of 3.4, removeShard is performed by mongos (unlike + // addShard), so the ShardRegistry is not synchronously reloaded on the config server when a + // shard is removed. + if (!Grid::get(txn)->shardRegistry()->reload(txn)) { + // If the first reload joined an existing one, call reload again to ensure the reload is + // fresh. + Grid::get(txn)->shardRegistry()->reload(txn); + } + + // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead. + const std::shared_ptr<Shard> shard{ + Grid::get(txn)->shardRegistry()->createConnection(shardConnectionString)}; + invariant(shard); + auto targeter = shard->getTargeter(); + + auto stopMonitoringGuard = MakeGuard([&] { + if (shardConnectionString.type() == ConnectionString::SET) { + // This is a workaround for the case were we could have some bad shard being + // requested to be added and we put that bad connection string on the global replica set + // monitor registry. It needs to be cleaned up so that when a correct replica set is + // added, it will be recreated. + ReplicaSetMonitor::remove(shardConnectionString.getSetName()); + } + }); + + // Validate the specified connection string may serve as shard at all + auto shardStatus = + _validateHostAsShard(txn, targeter, shardProposedName, shardConnectionString); + if (!shardStatus.isOK()) { + return shardStatus.getStatus(); + } + ShardType& shardType = shardStatus.getValue(); + + // Check that none of the existing shard candidate's dbs exist already + auto dbNamesStatus = _getDBNamesListFromShard(txn, targeter); + if (!dbNamesStatus.isOK()) { + return dbNamesStatus.getStatus(); + } + + for (const auto& dbName : dbNamesStatus.getValue()) { + auto dbt = Grid::get(txn)->catalogClient(txn)->getDatabase(txn, dbName); + if (dbt.isOK()) { + const auto& dbDoc = dbt.getValue().value; + return Status(ErrorCodes::OperationFailed, + str::stream() << "can't add shard " + << "'" + << shardConnectionString.toString() + << "'" + << " because a local database '" + << dbName + << "' exists in another " + << dbDoc.getPrimary()); + } else if (dbt != ErrorCodes::NamespaceNotFound) { + return dbt.getStatus(); + } + } + + // If a name for a shard wasn't provided, generate one + if (shardType.getName().empty()) { + auto result = generateNewShardName(txn); + if (!result.isOK()) { + return result.getStatus(); + } + shardType.setName(result.getValue()); + } + + if (maxSize > 0) { + shardType.setMaxSizeMB(maxSize); + } + + // If the minimum allowed version for the cluster is 3.4, set the featureCompatibilityVersion to + // 3.4 on the shard. + if (serverGlobalParams.featureCompatibility.version.load() == + ServerGlobalParams::FeatureCompatibility::Version::k34) { + auto versionResponse = + _runCommandForAddShard(txn, + targeter.get(), + "admin", + BSON(FeatureCompatibilityVersion::kCommandName + << FeatureCompatibilityVersionCommandParser::kVersion34)); + if (!versionResponse.isOK()) { + return versionResponse.getStatus(); + } + + if (!versionResponse.getValue().commandStatus.isOK()) { + if (versionResponse.getStatus().code() == ErrorCodes::CommandNotFound) { + return {ErrorCodes::OperationFailed, + "featureCompatibilityVersion for cluster is 3.4, cannot add a shard with " + "version below 3.4. See " + "http://dochub.mongodb.org/core/3.4-feature-compatibility."}; + } + return versionResponse.getValue().commandStatus; + } + } + + if (!MONGO_FAIL_POINT(dontUpsertShardIdentityOnNewShards)) { + auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName()); + + LOG(2) << "going to insert shardIdentity document into shard: " << shardType; + + auto swCommandResponse = + _runCommandForAddShard(txn, targeter.get(), "admin", commandRequest); + if (!swCommandResponse.isOK()) { + return swCommandResponse.getStatus(); + } + + auto commandResponse = std::move(swCommandResponse.getValue()); + + BatchedCommandResponse batchResponse; + auto batchResponseStatus = + Shard::CommandResponse::processBatchWriteResponse(commandResponse, &batchResponse); + if (!batchResponseStatus.isOK()) { + return batchResponseStatus; + } + } + + log() << "going to insert new entry for shard into config.shards: " << shardType.toString(); + + Status result = Grid::get(txn)->catalogClient(txn)->insertConfigDocument( + txn, ShardType::ConfigNS, shardType.toBSON(), ShardingCatalogClient::kMajorityWriteConcern); + if (!result.isOK()) { + log() << "error adding shard: " << shardType.toBSON() << " err: " << result.reason(); + return result; + } + + // Add all databases which were discovered on the new shard + for (const auto& dbName : dbNamesStatus.getValue()) { + DatabaseType dbt; + dbt.setName(dbName); + dbt.setPrimary(shardType.getName()); + dbt.setSharded(false); + + Status status = Grid::get(txn)->catalogClient(txn)->updateDatabase(txn, dbName, dbt); + if (!status.isOK()) { + log() << "adding shard " << shardConnectionString.toString() + << " even though could not add database " << dbName; + } + } + + // Record in changelog + BSONObjBuilder shardDetails; + shardDetails.append("name", shardType.getName()); + shardDetails.append("host", shardConnectionString.toString()); + + Grid::get(txn)->catalogClient(txn)->logChange( + txn, "addShard", "", shardDetails.obj(), ShardingCatalogClient::kMajorityWriteConcern); + + // Ensure the added shard is visible to this process. + auto shardRegistry = Grid::get(txn)->shardRegistry(); + if (!shardRegistry->getShard(txn, shardType.getName()).isOK()) { + return {ErrorCodes::OperationFailed, + "Could not find shard metadata for shard after adding it. This most likely " + "indicates that the shard was removed immediately after it was added."}; + } + stopMonitoringGuard.Dismiss(); + + return shardType.getName(); +} + +void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { + _executorForAddShard->appendConnectionStats(stats); +} + +Status ShardingCatalogManagerImpl::initializeShardingAwarenessOnUnawareShards( + OperationContext* txn) { + auto swShards = _getAllShardingUnawareShards(txn); + if (!swShards.isOK()) { + return swShards.getStatus(); + } else { + auto shards = std::move(swShards.getValue()); + for (const auto& shard : shards) { + auto status = upsertShardIdentityOnShard(txn, shard); + if (!status.isOK()) { + return status; + } + } + } + + // Note: this OK status means only that tasks to initialize sharding awareness on the shards + // were scheduled against the task executor, not that the tasks actually succeeded. + return Status::OK(); +} + +StatusWith<std::vector<ShardType>> ShardingCatalogManagerImpl::_getAllShardingUnawareShards( + OperationContext* txn) { + std::vector<ShardType> shards; + auto findStatus = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ShardType::ConfigNS), + BSON( + "state" << BSON("$ne" << static_cast<std::underlying_type<ShardType::ShardState>::type>( + ShardType::ShardState::kShardAware))), // shard is sharding unaware + BSONObj(), // no sort + boost::none); // no limit + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + for (const BSONObj& doc : findStatus.getValue().docs) { + auto shardRes = ShardType::fromBSON(doc); + if (!shardRes.isOK()) { + return {ErrorCodes::FailedToParse, + str::stream() << "Failed to parse shard " << causedBy(shardRes.getStatus()) + << doc}; + } + + Status validateStatus = shardRes.getValue().validate(); + if (!validateStatus.isOK()) { + return {validateStatus.code(), + str::stream() << "Failed to validate shard " << causedBy(validateStatus) + << doc}; + } + + shards.push_back(shardRes.getValue()); + } + + return shards; +} + +Status ShardingCatalogManagerImpl::upsertShardIdentityOnShard(OperationContext* txn, + ShardType shardType) { + + auto commandRequest = createShardIdentityUpsertForAddShard(txn, shardType.getName()); + + auto swConnString = ConnectionString::parse(shardType.getHost()); + if (!swConnString.isOK()) { + return swConnString.getStatus(); + } + + // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter + // instead. + const std::shared_ptr<Shard> shard{ + Grid::get(txn)->shardRegistry()->createConnection(swConnString.getValue())}; + invariant(shard); + auto targeter = shard->getTargeter(); + + _scheduleAddShardTask( + std::move(shardType), std::move(targeter), std::move(commandRequest), false); + + return Status::OK(); +} + +void ShardingCatalogManagerImpl::cancelAddShardTaskIfNeeded(const ShardId& shardId) { + stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); + if (_hasAddShardHandle_inlock(shardId)) { + auto cbHandle = _getAddShardHandle_inlock(shardId); + _executorForAddShard->cancel(cbHandle); + // Untrack the handle here so that if this shard is re-added before the CallbackCanceled + // status is delivered to the callback, a new addShard task for the shard will be + // created. + _untrackAddShardHandle_inlock(shardId); + } +} + +void ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled( + const CallbackArgs& cbArgs, + const ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter, + const BSONObj commandRequest) { + if (cbArgs.status == ErrorCodes::CallbackCanceled) { + return; + } + _scheduleAddShardTask( + std::move(shardType), std::move(targeter), std::move(commandRequest), true); +} + +void ShardingCatalogManagerImpl::_scheduleAddShardTask( + const ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter, + const BSONObj commandRequest, + const bool isRetry) { + stdx::lock_guard<stdx::mutex> lk(_addShardHandlesMutex); + + if (isRetry) { + // Untrack the handle from scheduleWorkAt, and schedule a new addShard task. + _untrackAddShardHandle_inlock(shardType.getName()); + } else { + // We should never be able to schedule an addShard task while one is running, because + // there is a unique index on the _id field in config.shards. + invariant(!_hasAddShardHandle_inlock(shardType.getName())); + } + + // Schedule the shardIdentity upsert request to run immediately, and track the handle. + + auto swHost = targeter->findHostWithMaxWait(ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + Milliseconds(kDefaultFindHostMaxWaitTime)); + if (!swHost.isOK()) { + // A 3.2 mongos must have previously successfully communicated with hosts in this shard, + // so a failure to find a host here is probably transient, and it is safe to retry. + warning() << "Failed to find host for shard " << shardType + << " when trying to upsert a shardIdentity document, " + << causedBy(swHost.getStatus()); + const Date_t now = _executorForAddShard->now(); + const Date_t when = now + getAddShardTaskRetryInterval(); + _trackAddShardHandle_inlock( + shardType.getName(), + _executorForAddShard->scheduleWorkAt( + when, + stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled, + this, + stdx::placeholders::_1, + shardType, + std::move(targeter), + std::move(commandRequest)))); + return; + } + + executor::RemoteCommandRequest request( + swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); + + const RemoteCommandCallbackFn callback = + stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse, + this, + stdx::placeholders::_1, + shardType, + std::move(targeter)); + + if (isRetry) { + log() << "Retrying upsert of shardIdentity document into shard " << shardType.getName(); + } + _trackAddShardHandle_inlock(shardType.getName(), + _executorForAddShard->scheduleRemoteCommand(request, callback)); +} + +void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( + const RemoteCommandCallbackArgs& cbArgs, + ShardType shardType, + std::shared_ptr<RemoteCommandTargeter> targeter) { + stdx::unique_lock<stdx::mutex> lk(_addShardHandlesMutex); + + // If the callback has been canceled (either due to shutdown or the shard being removed), we + // do not need to reschedule the task or update config.shards. + Status responseStatus = cbArgs.response.status; + if (responseStatus == ErrorCodes::CallbackCanceled) { + return; + } + + // If the handle no longer exists, the shard must have been removed, but the callback must not + // have been canceled until after the task had completed. In this case as well, we do not need + // to reschedule the task or update config.shards. + if (!_hasAddShardHandle_inlock(shardType.getName())) { + return; + } + + // Untrack the handle from scheduleRemoteCommand regardless of whether the command + // succeeded. If it failed, we will track the handle for the rescheduled task before + // releasing the mutex. + _untrackAddShardHandle_inlock(shardType.getName()); + + // Examine the response to determine if the upsert succeeded. + + bool rescheduleTask = false; + + auto swResponse = cbArgs.response; + if (!swResponse.isOK()) { + warning() << "Failed to upsert shardIdentity document during addShard into shard " + << shardType.getName() << "(" << shardType.getHost() + << "). The shardIdentity upsert will continue to be retried. " + << causedBy(swResponse.status); + rescheduleTask = true; + } else { + // Create a CommandResponse object in order to use processBatchWriteResponse. + BSONObj responseObj = swResponse.data.getOwned(); + BSONObj responseMetadata = swResponse.metadata.getOwned(); + Status commandStatus = getStatusFromCommandResult(responseObj); + Status writeConcernStatus = getWriteConcernStatusFromCommandResult(responseObj); + Shard::CommandResponse commandResponse(std::move(responseObj), + std::move(responseMetadata), + std::move(commandStatus), + std::move(writeConcernStatus)); + + BatchedCommandResponse batchResponse; + auto batchResponseStatus = + Shard::CommandResponse::processBatchWriteResponse(commandResponse, &batchResponse); + if (!batchResponseStatus.isOK()) { + if (batchResponseStatus == ErrorCodes::DuplicateKey) { + warning() + << "Received duplicate key error when inserting the shardIdentity " + "document into " + << shardType.getName() << "(" << shardType.getHost() + << "). This means the shard has a shardIdentity document with a clusterId " + "that differs from this cluster's clusterId. It may still belong to " + "or not have been properly removed from another cluster. The " + "shardIdentity upsert will continue to be retried."; + } else { + warning() << "Failed to upsert shardIdentity document into shard " + << shardType.getName() << "(" << shardType.getHost() + << ") during addShard. The shardIdentity upsert will continue to be " + "retried. " + << causedBy(batchResponseStatus); + } + rescheduleTask = true; + } + } + + if (rescheduleTask) { + // If the command did not succeed, schedule the upsert shardIdentity task again with a + // delay. + const Date_t now = _executorForAddShard->now(); + const Date_t when = now + getAddShardTaskRetryInterval(); + + // Track the handle from scheduleWorkAt. + _trackAddShardHandle_inlock( + shardType.getName(), + _executorForAddShard->scheduleWorkAt( + when, + stdx::bind(&ShardingCatalogManagerImpl::_scheduleAddShardTaskUnlessCanceled, + this, + stdx::placeholders::_1, + shardType, + std::move(targeter), + std::move(cbArgs.request.cmdObj)))); + return; + } + + // If the command succeeded, update config.shards to mark the shard as shardAware. + + // Release the _addShardHandlesMutex before updating config.shards, since it involves disk + // I/O. + // At worst, a redundant addShard task will be scheduled by a new primary if the current + // primary fails during that write. + lk.unlock(); + + // This thread is part of a thread pool owned by the addShard TaskExecutor. Threads in that + // pool are not created with Client objects associated with them, so a Client is created and + // attached here to do the local update. The Client is destroyed at the end of the scope, + // leaving the thread state as it was before. + Client::initThread(getThreadName().c_str()); + ON_BLOCK_EXIT([&] { Client::destroy(); }); + + // Use the thread's Client to create an OperationContext to perform the local write to + // config.shards. This OperationContext will automatically be destroyed when it goes out of + // scope at the end of this code block. + auto txnPtr = cc().makeOperationContext(); + + // Use kNoWaitWriteConcern to prevent waiting in this callback, since we don't handle a + // failed response anyway. If the write is rolled back, the new config primary will attempt to + // initialize sharding awareness on this shard again, and this update to config.shards will + // be automatically retried then. If it fails because the shard was removed through the normal + // removeShard path (so the entry in config.shards was deleted), no new addShard task will + // get scheduled on the next transition to primary. + auto updateStatus = + Grid::get(txnPtr.get()) + ->catalogClient(txnPtr.get()) + ->updateConfigDocument( + txnPtr.get(), + ShardType::ConfigNS, + BSON(ShardType::name(shardType.getName())), + BSON("$set" << BSON( + ShardType::state() + << static_cast<std::underlying_type<ShardType::ShardState>::type>( + ShardType::ShardState::kShardAware))), + false, + kNoWaitWriteConcern); + + if (!updateStatus.isOK()) { + warning() << "Failed to mark shard " << shardType.getName() << "(" << shardType.getHost() + << ") as shardAware in config.shards. This will be retried the next time a " + "config server transitions to primary. " + << causedBy(updateStatus.getStatus()); + } +} + +BSONObj ShardingCatalogManagerImpl::createShardIdentityUpsertForAddShard( + OperationContext* txn, const std::string& shardName) { + std::unique_ptr<BatchedUpdateDocument> updateDoc(new BatchedUpdateDocument()); + + BSONObjBuilder query; + query.append("_id", "shardIdentity"); + query.append(ShardIdentityType::shardName(), shardName); + query.append(ShardIdentityType::clusterId(), ClusterIdentityLoader::get(txn)->getClusterId()); + updateDoc->setQuery(query.obj()); + + BSONObjBuilder update; + { + BSONObjBuilder set(update.subobjStart("$set")); + set.append(ShardIdentityType::configsvrConnString(), + Grid::get(txn)->shardRegistry()->getConfigServerConnectionString().toString()); + } + updateDoc->setUpdateExpr(update.obj()); + updateDoc->setUpsert(true); + + std::unique_ptr<BatchedUpdateRequest> updateRequest(new BatchedUpdateRequest()); + updateRequest->addToUpdates(updateDoc.release()); + + BatchedCommandRequest commandRequest(updateRequest.release()); + commandRequest.setNS(NamespaceString::kConfigCollectionNamespace); + commandRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); + + return commandRequest.toBSON(); +} + +bool ShardingCatalogManagerImpl::_hasAddShardHandle_inlock(const ShardId& shardId) { + return _addShardHandles.find(shardId) != _addShardHandles.end(); +} + +const CallbackHandle& ShardingCatalogManagerImpl::_getAddShardHandle_inlock( + const ShardId& shardId) { + invariant(_hasAddShardHandle_inlock(shardId)); + return _addShardHandles.find(shardId)->second; +} + +void ShardingCatalogManagerImpl::_trackAddShardHandle_inlock( + const ShardId shardId, const StatusWith<CallbackHandle>& swHandle) { + if (swHandle.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(40219, swHandle.getStatus()); + _addShardHandles.insert(std::pair<ShardId, CallbackHandle>(shardId, swHandle.getValue())); +} + +void ShardingCatalogManagerImpl::_untrackAddShardHandle_inlock(const ShardId& shardId) { + auto it = _addShardHandles.find(shardId); + invariant(it != _addShardHandles.end()); + _addShardHandles.erase(shardId); +} + +} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager_zone_operations_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_zone_operations_impl.cpp new file mode 100644 index 00000000000..6af584d3c30 --- /dev/null +++ b/src/mongo/s/catalog/sharding_catalog_manager_zone_operations_impl.cpp @@ -0,0 +1,397 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/s/catalog/sharding_catalog_manager_impl.h" + +#include "mongo/base/status_with.h" +#include "mongo/client/read_preference.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/s/balancer/balancer_policy.h" +#include "mongo/db/write_concern_options.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_collection.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/catalog/type_tags.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/shard_key_pattern.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +const ReadPreferenceSetting kConfigPrimarySelector(ReadPreference::PrimaryOnly); +const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); + +/** + * Checks if the given key range for the given namespace conflicts with an existing key range. + * Note: range should have the full shard key. + * Returns ErrorCodes::RangeOverlapConflict is an overlap is detected. + */ +Status checkForOveralappedZonedKeyRange(OperationContext* txn, + Shard* configServer, + const NamespaceString& ns, + const ChunkRange& range, + const std::string& zoneName, + const KeyPattern& shardKeyPattern) { + DistributionStatus chunkDist(ns, ShardToChunksMap{}); + + auto tagStatus = configServer->exhaustiveFindOnConfig(txn, + kConfigPrimarySelector, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(TagsType::ConfigNS), + BSON(TagsType::ns(ns.ns())), + BSONObj(), + 0); + if (!tagStatus.isOK()) { + return tagStatus.getStatus(); + } + + const auto& tagDocList = tagStatus.getValue().docs; + for (const auto& tagDoc : tagDocList) { + auto tagParseStatus = TagsType::fromBSON(tagDoc); + if (!tagParseStatus.isOK()) { + return tagParseStatus.getStatus(); + } + + // Always extend ranges to full shard key to be compatible with tags created before + // the zone commands were implemented. + const auto& parsedTagDoc = tagParseStatus.getValue(); + auto overlapStatus = chunkDist.addRangeToZone( + ZoneRange(shardKeyPattern.extendRangeBound(parsedTagDoc.getMinKey(), false), + shardKeyPattern.extendRangeBound(parsedTagDoc.getMaxKey(), false), + parsedTagDoc.getTag())); + if (!overlapStatus.isOK()) { + return overlapStatus; + } + } + + auto overlapStatus = + chunkDist.addRangeToZone(ZoneRange(range.getMin(), range.getMax(), zoneName)); + if (!overlapStatus.isOK()) { + return overlapStatus; + } + + return Status::OK(); +} + +/** + * Returns a new range based on the given range with the full shard key. + * Returns: + * - ErrorCodes::NamespaceNotSharded if ns is not sharded. + * - ErrorCodes::ShardKeyNotFound if range is not compatible (for example, not a prefix of shard + * key) with the shard key of ns. + */ +StatusWith<ChunkRange> includeFullShardKey(OperationContext* txn, + Shard* configServer, + const NamespaceString& ns, + const ChunkRange& range, + KeyPattern* shardKeyPatternOut) { + auto findCollStatus = + configServer->exhaustiveFindOnConfig(txn, + kConfigPrimarySelector, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(CollectionType::ConfigNS), + BSON(CollectionType::fullNs(ns.ns())), + BSONObj(), + 1); + + if (!findCollStatus.isOK()) { + return findCollStatus.getStatus(); + } + + const auto& findCollResult = findCollStatus.getValue().docs; + + if (findCollResult.size() < 1) { + return {ErrorCodes::NamespaceNotSharded, str::stream() << ns.ns() << " is not sharded"}; + } + + auto parseStatus = CollectionType::fromBSON(findCollResult.front()); + if (!parseStatus.isOK()) { + return parseStatus.getStatus(); + } + + auto collDoc = parseStatus.getValue(); + if (collDoc.getDropped()) { + return {ErrorCodes::NamespaceNotSharded, str::stream() << ns.ns() << " is not sharded"}; + } + + const auto& shardKeyPattern = collDoc.getKeyPattern(); + const auto& shardKeyBSON = shardKeyPattern.toBSON(); + *shardKeyPatternOut = shardKeyPattern; + + if (!range.getMin().isFieldNamePrefixOf(shardKeyBSON)) { + return {ErrorCodes::ShardKeyNotFound, + str::stream() << "min: " << range.getMin() << " is not a prefix of the shard key " + << shardKeyBSON + << " of ns: " + << ns.ns()}; + } + + if (!range.getMax().isFieldNamePrefixOf(shardKeyBSON)) { + return {ErrorCodes::ShardKeyNotFound, + str::stream() << "max: " << range.getMax() << " is not a prefix of the shard key " + << shardKeyBSON + << " of ns: " + << ns.ns()}; + } + + return ChunkRange(shardKeyPattern.extendRangeBound(range.getMin(), false), + shardKeyPattern.extendRangeBound(range.getMax(), false)); +} + +} // namespace + +Status ShardingCatalogManagerImpl::addShardToZone(OperationContext* txn, + const std::string& shardName, + const std::string& zoneName) { + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); + + auto updateStatus = Grid::get(txn)->catalogClient(txn)->updateConfigDocument( + txn, + ShardType::ConfigNS, + BSON(ShardType::name(shardName)), + BSON("$addToSet" << BSON(ShardType::tags() << zoneName)), + false, + kNoWaitWriteConcern); + + if (!updateStatus.isOK()) { + return updateStatus.getStatus(); + } + + if (!updateStatus.getValue()) { + return {ErrorCodes::ShardNotFound, + str::stream() << "shard " << shardName << " does not exist"}; + } + + return Status::OK(); +} + +Status ShardingCatalogManagerImpl::removeShardFromZone(OperationContext* txn, + const std::string& shardName, + const std::string& zoneName) { + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); + + auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); + const NamespaceString shardNS(ShardType::ConfigNS); + + // + // Check whether the shard even exist in the first place. + // + + auto findShardExistsStatus = + configShard->exhaustiveFindOnConfig(txn, + kConfigPrimarySelector, + repl::ReadConcernLevel::kLocalReadConcern, + shardNS, + BSON(ShardType::name() << shardName), + BSONObj(), + 1); + + if (!findShardExistsStatus.isOK()) { + return findShardExistsStatus.getStatus(); + } + + if (findShardExistsStatus.getValue().docs.size() == 0) { + return {ErrorCodes::ShardNotFound, + str::stream() << "shard " << shardName << " does not exist"}; + } + + // + // Check how many shards belongs to this zone. + // + + auto findShardStatus = + configShard->exhaustiveFindOnConfig(txn, + kConfigPrimarySelector, + repl::ReadConcernLevel::kLocalReadConcern, + shardNS, + BSON(ShardType::tags() << zoneName), + BSONObj(), + 2); + + if (!findShardStatus.isOK()) { + return findShardStatus.getStatus(); + } + + const auto shardDocs = findShardStatus.getValue().docs; + + if (shardDocs.size() == 0) { + // The zone doesn't exists, this could be a retry. + return Status::OK(); + } + + if (shardDocs.size() == 1) { + auto shardDocStatus = ShardType::fromBSON(shardDocs.front()); + if (!shardDocStatus.isOK()) { + return shardDocStatus.getStatus(); + } + + auto shardDoc = shardDocStatus.getValue(); + if (shardDoc.getName() != shardName) { + // The last shard that belongs to this zone is a different shard. + // This could be a retry, so return OK. + return Status::OK(); + } + + auto findChunkRangeStatus = + configShard->exhaustiveFindOnConfig(txn, + kConfigPrimarySelector, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(TagsType::ConfigNS), + BSON(TagsType::tag() << zoneName), + BSONObj(), + 1); + + if (!findChunkRangeStatus.isOK()) { + return findChunkRangeStatus.getStatus(); + } + + if (findChunkRangeStatus.getValue().docs.size() > 0) { + return {ErrorCodes::ZoneStillInUse, + "cannot remove a shard from zone if a chunk range is associated with it"}; + } + } + + // + // Perform update. + // + + auto updateStatus = Grid::get(txn)->catalogClient(txn)->updateConfigDocument( + txn, + ShardType::ConfigNS, + BSON(ShardType::name(shardName)), + BSON("$pull" << BSON(ShardType::tags() << zoneName)), + false, + kNoWaitWriteConcern); + + if (!updateStatus.isOK()) { + return updateStatus.getStatus(); + } + + // The update did not match a document, another thread could have removed it. + if (!updateStatus.getValue()) { + return {ErrorCodes::ShardNotFound, + str::stream() << "shard " << shardName << " no longer exist"}; + } + + return Status::OK(); +} + + +Status ShardingCatalogManagerImpl::assignKeyRangeToZone(OperationContext* txn, + const NamespaceString& ns, + const ChunkRange& givenRange, + const std::string& zoneName) { + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); + + auto configServer = Grid::get(txn)->shardRegistry()->getConfigShard(); + + KeyPattern shardKeyPattern{BSONObj()}; + auto fullShardKeyStatus = + includeFullShardKey(txn, configServer.get(), ns, givenRange, &shardKeyPattern); + if (!fullShardKeyStatus.isOK()) { + return fullShardKeyStatus.getStatus(); + } + + const auto& fullShardKeyRange = fullShardKeyStatus.getValue(); + + auto zoneExistStatus = + configServer->exhaustiveFindOnConfig(txn, + kConfigPrimarySelector, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ShardType::ConfigNS), + BSON(ShardType::tags() << zoneName), + BSONObj(), + 1); + + if (!zoneExistStatus.isOK()) { + return zoneExistStatus.getStatus(); + } + + auto zoneExist = zoneExistStatus.getValue().docs.size() > 0; + if (!zoneExist) { + return {ErrorCodes::ZoneNotFound, + str::stream() << "zone " << zoneName << " does not exist"}; + } + + auto overlapStatus = checkForOveralappedZonedKeyRange( + txn, configServer.get(), ns, fullShardKeyRange, zoneName, shardKeyPattern); + if (!overlapStatus.isOK()) { + return overlapStatus; + } + + BSONObj updateQuery( + BSON("_id" << BSON(TagsType::ns(ns.ns()) << TagsType::min(fullShardKeyRange.getMin())))); + + BSONObjBuilder updateBuilder; + updateBuilder.append("_id", + BSON(TagsType::ns(ns.ns()) << TagsType::min(fullShardKeyRange.getMin()))); + updateBuilder.append(TagsType::ns(), ns.ns()); + updateBuilder.append(TagsType::min(), fullShardKeyRange.getMin()); + updateBuilder.append(TagsType::max(), fullShardKeyRange.getMax()); + updateBuilder.append(TagsType::tag(), zoneName); + + auto updateStatus = Grid::get(txn)->catalogClient(txn)->updateConfigDocument( + txn, TagsType::ConfigNS, updateQuery, updateBuilder.obj(), true, kNoWaitWriteConcern); + + if (!updateStatus.isOK()) { + return updateStatus.getStatus(); + } + + return Status::OK(); +} + +Status ShardingCatalogManagerImpl::removeKeyRangeFromZone(OperationContext* txn, + const NamespaceString& ns, + const ChunkRange& range) { + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); + + auto configServer = Grid::get(txn)->shardRegistry()->getConfigShard(); + + KeyPattern shardKeyPattern{BSONObj()}; + auto fullShardKeyStatus = + includeFullShardKey(txn, configServer.get(), ns, range, &shardKeyPattern); + if (!fullShardKeyStatus.isOK()) { + return fullShardKeyStatus.getStatus(); + } + + BSONObjBuilder removeBuilder; + removeBuilder.append("_id", BSON(TagsType::ns(ns.ns()) << TagsType::min(range.getMin()))); + removeBuilder.append(TagsType::max(), range.getMax()); + + return Grid::get(txn)->catalogClient(txn)->removeConfigDocuments( + txn, TagsType::ConfigNS, removeBuilder.obj(), kNoWaitWriteConcern); +} + +} // namespace mongo diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index 95189db6ae5..c6c5b219c42 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -143,7 +143,7 @@ std::unique_ptr<ShardingCatalogManager> ConfigServerTestFixture::makeShardingCat _addShardNetworkTestEnv = stdx::make_unique<NetworkTestEnv>(specialExec.get(), _mockNetworkForAddShard); - return stdx::make_unique<ShardingCatalogManagerImpl>(catalogClient, std::move(specialExec)); + return stdx::make_unique<ShardingCatalogManagerImpl>(std::move(specialExec)); } std::unique_ptr<CatalogCache> ConfigServerTestFixture::makeCatalogCache() { |