From be8b9869132ed44b25a909476abad03d9254fde9 Mon Sep 17 00:00:00 2001 From: Jess Fan Date: Thu, 28 Jul 2016 20:15:02 -0400 Subject: SERVER-25000 Built _configsvrMergeChunk wrapper around applyOps --- .../db/s/config/configsvr_merge_chunk_command.cpp | 125 +++++++ src/mongo/s/catalog/replset/SConscript | 1 + .../replset/sharding_catalog_manager_impl.cpp | 144 +++++++- .../replset/sharding_catalog_manager_impl.h | 6 + .../replset/sharding_catalog_merge_chunk_test.cpp | 365 +++++++++++++++++++++ src/mongo/s/catalog/sharding_catalog_manager.h | 10 + .../s/catalog/sharding_catalog_manager_mock.cpp | 8 + .../s/catalog/sharding_catalog_manager_mock.h | 6 + src/mongo/s/config_server_test_fixture.cpp | 7 +- .../s/request_types/merge_chunk_request_test.cpp | 76 ++++- .../s/request_types/merge_chunk_request_type.cpp | 26 +- .../s/request_types/merge_chunk_request_type.h | 8 +- 12 files changed, 756 insertions(+), 26 deletions(-) create mode 100644 src/mongo/db/s/config/configsvr_merge_chunk_command.cpp create mode 100644 src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp diff --git a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp new file mode 100644 index 00000000000..596617a1c15 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/action_type.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/privilege.h" +#include "mongo/db/commands.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/request_types/merge_chunk_request_type.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace { + +using std::string; + +/** + * Internal sharding command run on config servers to merge a set of chunks. + * + * Format: + * { + * _configsvrMergeChunk: , + * collEpoch: , + * chunkBoundaries: [ + * , + * , + * ... + * ], + * shard: , + * writeConcern: + * } + */ +class ConfigSvrMergeChunkCommand : public Command { +public: + ConfigSvrMergeChunkCommand() : Command("_configsvrMergeChunk") {} + + void help(std::stringstream& help) const override { + help << "Internal command, which is sent by a shard to the sharding config server. Do " + "not call directly. Receives, validates, and processes a MergeChunkRequest" + } + + bool slaveOk() const override { + return false; + } + + bool adminOnly() const override { + return true; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return true; + } + + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) override { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forClusterResource(), ActionType::internal)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); + } + return Status::OK(); + } + + std::string parseNs(const std::string& dbname, const BVSONObj& cmdObj) const override { + return parseNsFullyQualified(dbname, cmdObj); + } + + bool run(OperationContext* txn, + const std::string& dbName, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { + if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { + uasserted(ErrorCodes::IllegalOperation, + "_configsvrMergeChunk can only be run on config servers"); + } + + auto parsedRequest = uassertStatusOK(MergeChunkRequest::parseFromConfigCommand(cmdObj)); + + Status mergeChunkResult = + Grid::get(txn)->catalogManager()->commitChunkMerge(txn, + parsedRequest.getNamespace(), + parsedRequest.getEpoch(), + parsedRequest.getChunkBoundaries(), + parsedRequest.getShardName()); + + uassertStatusOK(mergeChunkResult); + + return true; + } +} configsvrMergeChunkCmd; +} // namespace +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index 9296788c94d..bc50301fc6c 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -113,6 +113,7 @@ env.CppUnitTest( 'sharding_catalog_remove_shard_from_zone_test.cpp', 'sharding_catalog_config_initialization_test.cpp', 'sharding_catalog_split_chunk_test.cpp', + 'sharding_catalog_merge_chunk_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/repl/replmocks', diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index dc288b6cde8..c85d0464d90 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -1014,7 +1014,7 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); // Acquire GlobalLock in MODE_X twice to prevent yielding. - // GlobalLock and the following lock on config.chunks are only needed to support + // GLobalLock and the following lock on config.chunks are only needed to support // mixed-mode operation with mongoses from 3.2 // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4 Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX); @@ -1158,6 +1158,148 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, return applyOpsStatus; } +Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const std::vector& chunkBoundaries, + const std::string& shardName) { + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel + Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); + + // Acquire GlobalLock in MODE_X twice to prevent yielding. + // GLobalLock and the following lock on config.chunks are only needed to support + // mixed-mode operation with mongoses from 3.2 + // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4 + Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX); + Lock::GlobalLock secondGlobalLock(txn->lockState(), MODE_X, UINT_MAX); + + // Acquire lock on config.chunks in MODE_X + AutoGetCollection autoColl(txn, NamespaceString(ChunkType::ConfigNS), MODE_X); + + // Get the chunk with the highest version for this namespace + auto findStatus = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON("ns" << ns.ns()), + BSON(ChunkType::DEPRECATED_lastmod << -1), + 1); + + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& chunksVector = findStatus.getValue().docs; + if (chunksVector.empty()) + return {ErrorCodes::IllegalOperation, + "collection does not exist, isn't sharded, or has no chunks"}; + + ChunkVersion collVersion = + ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); + + // Return an error if epoch of chunk does not match epoch of request + if (collVersion.epoch() != requestEpoch) { + return {ErrorCodes::StaleEpoch, + "epoch of chunk does not match epoch of request. This most likely means " + "that the collection was dropped and re-created."}; + } + + // Build chunks to be merged + std::vector chunksToMerge; + + ChunkType itChunk; + itChunk.setMax(chunkBoundaries.front()); + itChunk.setNS(ns.ns()); + itChunk.setShard(shardName); + + // Do not use the first chunk boundary as a max bound while building chunks + for (size_t i = 1; i < chunkBoundaries.size(); ++i) { + itChunk.setMin(itChunk.getMax()); + itChunk.setMax(chunkBoundaries[i]); + chunksToMerge.push_back(itChunk); + } + + ChunkVersion mergeVersion = collVersion; + mergeVersion.incMinor(); + + BSONArrayBuilder updates; + + // Build an update operation to expand the first chunk into the newly merged chunk + { + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", false); + op.append("ns", ChunkType::ConfigNS); + + // expand first chunk into newly merged chunk + ChunkType mergedChunk(chunksToMerge.front()); + mergedChunk.setMax(chunksToMerge.back().getMax()); + + // fill in additional details for sending through applyOps + mergedChunk.setVersion(mergeVersion); + + // add the new chunk information as the update object + op.append("o", mergedChunk.toBSON()); + + // query object + op.append("o2", BSON(ChunkType::name(mergedChunk.getName()))); + + updates.append(op.obj()); + } + + // Build update operations to delete the rest of the chunks to be merged. Remember not + // to delete the first chunk we're expanding + for (size_t i = 1; i < chunksToMerge.size(); ++i) { + BSONObjBuilder op; + op.append("op", "d"); + op.append("ns", ChunkType::ConfigNS); + + op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName()))); + + updates.append(op.obj()); + } + + BSONArrayBuilder preCond; + { + BSONObjBuilder b; + b.append("ns", ChunkType::ConfigNS); + b.append("q", + BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby" + << BSON(ChunkType::DEPRECATED_lastmod() << -1))); + { + BSONObjBuilder bb(b.subobjStart("res")); + collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod()); + } + preCond.append(b.obj()); + } + + // apply the batch of updates to remote and local metadata + Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated( + txn, updates.arr(), preCond.arr(), ns.ns(), mergeVersion); + if (!applyOpsStatus.isOK()) { + return applyOpsStatus; + } + + // log changes + BSONObjBuilder logDetail; + { + BSONArrayBuilder b(logDetail.subarrayStart("merged")); + for (auto chunkToMerge : chunksToMerge) { + b.append(chunkToMerge.toBSON()); + } + } + collVersion.addToBSON(logDetail, "prevShardVersion"); + mergeVersion.addToBSON(logDetail, "mergedVersion"); + + grid.catalogClient(txn)->logChange(txn, "merge", ns.ns(), logDetail.obj()); + + return applyOpsStatus; +} + void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { _executorForAddShard->appendConnectionStats(stats); } diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h index 339e08522b0..b25c7d1bf8c 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -94,6 +94,12 @@ public: const std::vector& splitPoints, const std::string& shardName) override; + Status commitChunkMerge(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const std::vector& chunkBoundaries, + const std::string& shardName) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; diff --git a/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp new file mode 100644 index 00000000000..c2107be7390 --- /dev/null +++ b/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp @@ -0,0 +1,365 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/read_preference.h" +#include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/config_server_test_fixture.h" + +namespace mongo { +namespace { + +using MergeChunkTest = ConfigServerTestFixture; + +TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + // Construct chunk to be merged + auto chunk2(chunk); + + auto chunkMin = BSON("a" << 1); + auto chunkBound = BSON("a" << 5); + auto chunkMax = BSON("a" << 10); + // first chunk boundaries + chunk.setMin(chunkMin); + chunk.setMax(chunkBound); + // second chunk boundaries + chunk2.setMin(chunkBound); + chunk2.setMax(chunkMax); + + std::vector chunkBoundaries{chunkMin, chunkBound, chunkMax}; + + setupChunks({chunk, chunk2}); + + ASSERT_OK(catalogManager()->commitChunkMerge(operationContext(), + NamespaceString("TestDB.TestColl"), + origVersion.epoch(), + chunkBoundaries, + "shard0000")); + + auto findResponse = uassertStatusOK( + getConfigShard()->exhaustiveFindOnConfig(operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON(ChunkType::ns() << "TestDB.TestColl"), + BSON(ChunkType::DEPRECATED_lastmod << -1), + boost::none)); + + const auto& chunksVector = findResponse.docs; + + // There should be exactly one chunk left in the collection + ASSERT_EQ(1u, chunksVector.size()); + + // MergedChunk should have range [chunkMin, chunkMax] + auto mergedChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front())); + ASSERT_EQ(chunkMin, mergedChunk.getMin()); + ASSERT_EQ(chunkMax, mergedChunk.getMax()); + + { + // Check for increment on mergedChunk's minor version + ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion()); + } +} + +TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + // Construct chunks to be merged + auto chunk2(chunk); + auto chunk3(chunk); + + auto chunkMin = BSON("a" << 1); + auto chunkBound = BSON("a" << 5); + auto chunkBound2 = BSON("a" << 7); + auto chunkMax = BSON("a" << 10); + // first chunk boundaries + chunk.setMin(chunkMin); + chunk.setMax(chunkBound); + // second chunk boundaries + chunk2.setMin(chunkBound); + chunk2.setMax(chunkBound2); + // third chunk boundaries + chunk3.setMin(chunkBound2); + chunk3.setMax(chunkMax); + + // Record chunk boundaries for passing into commitChunkMerge + std::vector chunkBoundaries{chunkMin, chunkBound, chunkBound2, chunkMax}; + + setupChunks({chunk, chunk2, chunk3}); + + ASSERT_OK(catalogManager()->commitChunkMerge(operationContext(), + NamespaceString("TestDB.TestColl"), + origVersion.epoch(), + chunkBoundaries, + "shard0000")); + + auto findResponse = uassertStatusOK( + getConfigShard()->exhaustiveFindOnConfig(operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON(ChunkType::ns() << "TestDB.TestColl"), + BSON(ChunkType::DEPRECATED_lastmod << -1), + boost::none)); + + const auto& chunksVector = findResponse.docs; + + // There should be exactly one chunk left in the collection + ASSERT_EQ(1u, chunksVector.size()); + + // MergedChunk should have range [chunkMin, chunkMax] + auto mergedChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front())); + ASSERT_EQ(chunkMin, mergedChunk.getMin()); + ASSERT_EQ(chunkMax, mergedChunk.getMax()); + + { + // Check for increment on mergedChunk's minor version + ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion()); + } +} + +TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) { + ChunkType chunk, otherChunk; + chunk.setNS("TestDB.TestColl"); + otherChunk.setNS("TestDB.TestColl"); + auto collEpoch = OID::gen(); + + auto origVersion = ChunkVersion(1, 2, collEpoch); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + // Construct chunk to be merged + auto chunk2(chunk); + + auto chunkMin = BSON("a" << 1); + auto chunkBound = BSON("a" << 5); + auto chunkMax = BSON("a" << 10); + // first chunk boundaries + chunk.setMin(chunkMin); + chunk.setMax(chunkBound); + // second chunk boundaries + chunk2.setMin(chunkBound); + chunk2.setMax(chunkMax); + + // Record chunk boundaries for passing into commitChunkMerge + std::vector chunkBoundaries{chunkMin, chunkBound, chunkMax}; + + // Set up other chunk with competing version + auto competingVersion = ChunkVersion(2, 1, collEpoch); + otherChunk.setVersion(competingVersion); + otherChunk.setShard(ShardId("shard0000")); + otherChunk.setMin(BSON("a" << 10)); + otherChunk.setMax(BSON("a" << 20)); + + setupChunks({chunk, chunk2, otherChunk}); + + ASSERT_OK(catalogManager()->commitChunkMerge(operationContext(), + NamespaceString("TestDB.TestColl"), + collEpoch, + chunkBoundaries, + "shard0000")); + + auto findResponse = uassertStatusOK( + getConfigShard()->exhaustiveFindOnConfig(operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON(ChunkType::ns() << "TestDB.TestColl"), + BSON(ChunkType::DEPRECATED_lastmod << -1), + boost::none)); + + const auto& chunksVector = findResponse.docs; + + // There should be exactly two chunks left in the collection: one merged, one competing + ASSERT_EQ(2u, chunksVector.size()); + + // MergedChunk should have range [chunkMin, chunkMax] + auto mergedChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front())); + ASSERT_EQ(chunkMin, mergedChunk.getMin()); + ASSERT_EQ(chunkMax, mergedChunk.getMax()); + + { + // Check for minor increment on collection version + ASSERT_EQ(competingVersion.majorVersion(), mergedChunk.getVersion().majorVersion()); + ASSERT_EQ(competingVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion()); + } +} + +TEST_F(MergeChunkTest, MergeLeavesOtherChunksAlone) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 2, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + // Construct chunk to be merged + auto chunk2(chunk); + + auto chunkMin = BSON("a" << 1); + auto chunkBound = BSON("a" << 5); + auto chunkMax = BSON("a" << 10); + // first chunk boundaries + chunk.setMin(chunkMin); + chunk.setMax(chunkBound); + // second chunk boundaries + chunk2.setMin(chunkBound); + chunk2.setMax(chunkMax); + + // Record chunk boundaries for passing into commitChunkMerge + std::vector chunkBoundaries{chunkMin, chunkBound, chunkMax}; + + // Set up unmerged chunk + auto otherChunk(chunk); + otherChunk.setMin(BSON("a" << 10)); + otherChunk.setMax(BSON("a" << 20)); + + setupChunks({chunk, chunk2, otherChunk}); + + ASSERT_OK(catalogManager()->commitChunkMerge(operationContext(), + NamespaceString("TestDB.TestColl"), + origVersion.epoch(), + chunkBoundaries, + "shard0000")); + + auto findResponse = uassertStatusOK( + getConfigShard()->exhaustiveFindOnConfig(operationContext(), + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON(ChunkType::ns() << "TestDB.TestColl"), + BSON(ChunkType::DEPRECATED_lastmod << -1), + boost::none)); + + const auto& chunksVector = findResponse.docs; + + // There should be exactly two chunks left in the collection: one merged, one untouched + ASSERT_EQ(2u, chunksVector.size()); + + // MergedChunk should have range [chunkMin, chunkMax] + auto mergedChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front())); + ASSERT_EQ(chunkMin, mergedChunk.getMin()); + ASSERT_EQ(chunkMax, mergedChunk.getMax()); + + { + // Check for increment on mergedChunk's minor version + ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion()); + } + + // OtherChunk should have been left alone + auto foundOtherChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.back())); + ASSERT_EQ(otherChunk.getMin(), foundOtherChunk.getMin()); + ASSERT_EQ(otherChunk.getMax(), foundOtherChunk.getMax()); +} + +TEST_F(MergeChunkTest, NonExistingNamespace) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + // Construct chunk to be merged + auto chunk2(chunk); + + auto chunkMin = BSON("a" << 1); + auto chunkBound = BSON("a" << 5); + auto chunkMax = BSON("a" << 10); + // first chunk boundaries + chunk.setMin(chunkMin); + chunk.setMax(chunkBound); + chunk2.setMin(chunkBound); + chunk2.setMax(chunkMax); + + // Record chunk boundaries for passing into commitChunkMerge + std::vector chunkBoundaries{chunkMin, chunkBound, chunkMax}; + + setupChunks({chunk, chunk2}); + + auto mergeStatus = catalogManager()->commitChunkMerge(operationContext(), + NamespaceString("TestDB.NonExistingColl"), + origVersion.epoch(), + chunkBoundaries, + "shard0000"); + ASSERT_EQ(ErrorCodes::IllegalOperation, mergeStatus); +} + +TEST_F(MergeChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + // Construct chunk to be merged + auto chunk2(chunk); + + auto chunkMin = BSON("a" << 1); + auto chunkBound = BSON("a" << 5); + auto chunkMax = BSON("a" << 10); + // first chunk boundaries + chunk.setMin(chunkMin); + chunk.setMax(chunkBound); + chunk2.setMin(chunkBound); + chunk2.setMax(chunkMax); + + // Record chunk baoundaries for passing into commitChunkMerge + std::vector chunkBoundaries{chunkMin, chunkBound, chunkMax}; + + setupChunks({chunk, chunk2}); + + auto mergeStatus = catalogManager()->commitChunkMerge(operationContext(), + NamespaceString("TestDB.TestColl"), + OID::gen(), + chunkBoundaries, + "shard0000"); + ASSERT_EQ(ErrorCodes::StaleEpoch, mergeStatus); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index 58c39631fd6..a478d55f2d8 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -148,6 +148,16 @@ public: const std::vector& splitPoints, const std::string& shardName) = 0; + /** + * Updates chunk metadata in config.chunks collection to reflect the given chunks being merged + * into a single larger chunk based on the specified boundaries of the smaller chunks. + */ + virtual Status commitChunkMerge(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const std::vector& chunkBoundaries, + const std::string& shardName) = 0; + /** * Append information about the connection pools owned by the CatalogManager. */ diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp index 57acb99abae..577c179cd49 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp @@ -90,6 +90,14 @@ Status ShardingCatalogManagerMock::commitChunkSplit(OperationContext* txn, return {ErrorCodes::InternalError, "Method not implemented"}; } +Status ShardingCatalogManagerMock::commitChunkMerge(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const std::vector& chunkBoundaries, + const std::string& shardName) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {} Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationContext* txn) { diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h index 7c346bc3e91..1b49be01afe 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h @@ -76,6 +76,12 @@ public: const std::vector& splitPoints, const std::string& shardName) override; + Status commitChunkMerge(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const std::vector& chunkBoundaries, + const std::string& shardName) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index e769876bdd9..546a1d43847 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -425,11 +425,10 @@ Status ConfigServerTestFixture::setupChunks(const std::vector& chunks StatusWith ConfigServerTestFixture::getChunkDoc(OperationContext* txn, const BSONObj& minKey) { - auto doc = - findOneOnConfigCollection(txn, NamespaceString(ChunkType::ConfigNS), BSON("min" << minKey)); - if (!doc.isOK()) { + auto doc = findOneOnConfigCollection( + txn, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::min() << minKey)); + if (!doc.isOK()) return doc.getStatus(); - } return ChunkType::fromBSON(doc.getValue()); } diff --git a/src/mongo/s/request_types/merge_chunk_request_test.cpp b/src/mongo/s/request_types/merge_chunk_request_test.cpp index a3610fbe5e0..2bca08c5fba 100644 --- a/src/mongo/s/request_types/merge_chunk_request_test.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_test.cpp @@ -46,12 +46,15 @@ TEST(MergeChunkRequest, BasicValidConfigCommand) { << "collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries" - << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))))); + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) + << "shard" + << "shard0000"))); ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace()); ASSERT_EQ(OID("7fffffff0000000000000001"), request.getEpoch()); ASSERT_EQ(BSON("a" << 1), request.getChunkBoundaries().at(0)); ASSERT_EQ(BSON("a" << 5), request.getChunkBoundaries().at(1)); ASSERT_EQ(BSON("a" << 10), request.getChunkBoundaries().at(2)); + ASSERT_EQ("shard0000", request.getShardName()); } TEST(MergeChunkRequest, ConfigCommandtoBSON) { @@ -61,7 +64,9 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) { << "collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries" - << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))); + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) + << "shard" + << "shard0000"); BSONObj writeConcernObj = BSON("writeConcern" << BSON("w" << "majority")); @@ -80,7 +85,9 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) { TEST(MergeChunkRequest, MissingNameSpaceErrors) { auto request = MergeChunkRequest::parseFromConfigCommand( BSON("collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries" - << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)))); + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); } @@ -89,16 +96,30 @@ TEST(MergeChunkRequest, MissingCollEpochErrors) { BSON("_configsvrMergeChunk" << "TestDB.TestColl" << "chunkBoundaries" - << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)))); + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); } TEST(MergeChunkRequest, MissingChunkBoundariesErrors) { - auto request = - MergeChunkRequest::parseFromConfigCommand(BSON("_configsvrMergeChunk" - << "TestDB.TestColl" - << "collEpoch" - << OID("7fffffff0000000000000001"))); + auto request = MergeChunkRequest::parseFromConfigCommand(BSON("_configsvrMergeChunk" + << "TestDB.TestColl" + << "collEpoch" + << OID("7fffffff0000000000000001") + << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(MergeChunkRequest, MissingShardNameErrors) { + auto request = MergeChunkRequest::parseFromConfigCommand( + BSON("_configsvrMergeChunk" + << "TestDB.TestColl" + << "collEpoch" + << OID("7fffffff0000000000000001") + << "chunkBoundaries" + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)))); ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); } @@ -106,7 +127,9 @@ TEST(MergeChunkRequest, WrongNamespaceTypeErrors) { auto request = MergeChunkRequest::parseFromConfigCommand(BSON( "_configsvrMergeChunk" << 1234 << "collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries" - << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)))); + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); } @@ -117,7 +140,9 @@ TEST(MergeChunkRequest, WrongCollEpochTypeErrors) { << "collEpoch" << 1234 << "chunkBoundaries" - << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)))); + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); } @@ -127,7 +152,22 @@ TEST(MergeChunkRequest, WrongChunkBoundariesTypeErrors) { << "collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries" - << 1234)); + << 1234 + << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(MergeChunkRequest, WrongShardNameTypeErrors) { + auto request = MergeChunkRequest::parseFromConfigCommand( + BSON("_configsvrMergeChunk" + << "TestDB.TestColl" + << "collEpoch" + << OID("7fffffff0000000000000001") + << "chunkBoundaries" + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) + << "shard" + << 1234)); ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); } @@ -138,7 +178,9 @@ TEST(MergeChunkRequest, InvalidNamespaceErrors) { << "collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries" - << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)))); + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::InvalidNamespace, request.getStatus()); } @@ -148,7 +190,9 @@ TEST(MergeChunkRequest, EmptyChunkBoundariesErrors) { << "collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries" - << BSONArray())); + << BSONArray() + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::InvalidOptions, request.getStatus()); } @@ -159,7 +203,9 @@ TEST(MergeChunkRequest, TooFewChunkBoundariesErrors) { << "collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries" - << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 10)))); + << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 10)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::InvalidOptions, request.getStatus()); } } diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp index 75f19e7f610..508bf9b247d 100644 --- a/src/mongo/s/request_types/merge_chunk_request_type.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp @@ -35,6 +35,7 @@ namespace mongo { +using std::string; using std::vector; namespace { @@ -42,18 +43,21 @@ namespace { const char kConfigsvrMergeChunk[] = "_configsvrMergeChunk"; const char kCollEpoch[] = "collEpoch"; const char kChunkBoundaries[] = "chunkBoundaries"; +const char kShardName[] = "shard"; } // unnamed namespace MergeChunkRequest::MergeChunkRequest(NamespaceString nss, OID epoch, - vector chunkBoundaries) + vector chunkBoundaries, + string shardName) : _nss(std::move(nss)), _epoch(std::move(epoch)), - _chunkBoundaries(std::move(chunkBoundaries)) {} + _chunkBoundaries(std::move(chunkBoundaries)), + _shardName(std::move(shardName)) {} StatusWith MergeChunkRequest::parseFromConfigCommand(const BSONObj& cmdObj) { - std::string ns; + string ns; auto parseNamespaceStatus = bsonExtractStringField(cmdObj, kConfigsvrMergeChunk, &ns); if (!parseNamespaceStatus.isOK()) { @@ -82,8 +86,15 @@ StatusWith MergeChunkRequest::parseFromConfigCommand(const BS } } - auto request = - MergeChunkRequest(NamespaceString(ns), std::move(epoch), std::move(chunkBoundaries)); + string shardName; + auto parseShardNameStatus = bsonExtractStringField(cmdObj, kShardName, &shardName); + + if (!parseShardNameStatus.isOK()) { + return parseShardNameStatus; + } + + auto request = MergeChunkRequest( + NamespaceString(ns), std::move(epoch), std::move(chunkBoundaries), std::move(shardName)); Status validationStatus = request._validate(); if (!validationStatus.isOK()) { return validationStatus; @@ -111,6 +122,7 @@ void MergeChunkRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) { chunkBoundariesArray.append(chunkBoundary); } } + cmdBuilder->append(kShardName, _shardName); } const NamespaceString& MergeChunkRequest::getNamespace() const { @@ -125,6 +137,10 @@ const vector& MergeChunkRequest::getChunkBoundaries() const { return _chunkBoundaries; } +const string& MergeChunkRequest::getShardName() const { + return _shardName; +} + Status MergeChunkRequest::_validate() { if (!getNamespace().isValid()) { return Status(ErrorCodes::InvalidNamespace, diff --git a/src/mongo/s/request_types/merge_chunk_request_type.h b/src/mongo/s/request_types/merge_chunk_request_type.h index 8be781f3626..5aa72ab6356 100644 --- a/src/mongo/s/request_types/merge_chunk_request_type.h +++ b/src/mongo/s/request_types/merge_chunk_request_type.h @@ -43,7 +43,10 @@ namespace mongo { */ class MergeChunkRequest { public: - MergeChunkRequest(NamespaceString nss, OID epoch, std::vector chunkBoundaries); + MergeChunkRequest(NamespaceString nss, + OID epoch, + std::vector chunkBoundaries, + std::string shardName); /** * Parses the provided BSON content as the internal _configsvrMergeChunk command, and if @@ -57,6 +60,7 @@ public: * , * ... * ], + * shard: * } */ static StatusWith parseFromConfigCommand(const BSONObj& cmdObj); @@ -77,6 +81,7 @@ public: const NamespaceString& getNamespace() const; const OID& getEpoch() const; const std::vector& getChunkBoundaries() const; + const std::string& getShardName() const; private: /** @@ -93,6 +98,7 @@ private: * Specifies the boundaries of the chunks to be merged. */ std::vector _chunkBoundaries; + std::string _shardName; }; } // namespace mongo -- cgit v1.2.1