summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJess Fan <jess.fan@10gen.com>2016-07-28 20:15:02 -0400
committerJess Fan <jess.fan@10gen.com>2016-08-11 11:40:42 -0400
commitbe8b9869132ed44b25a909476abad03d9254fde9 (patch)
tree9a0e3df98699293ea5a2a40d5851b3cf3fa53756
parenteb15955c67b8a13455b91a6848f8750447fb0f44 (diff)
downloadmongo-be8b9869132ed44b25a909476abad03d9254fde9.tar.gz
SERVER-25000 Built _configsvrMergeChunk wrapper around applyOps
-rw-r--r--src/mongo/db/s/config/configsvr_merge_chunk_command.cpp125
-rw-r--r--src/mongo/s/catalog/replset/SConscript1
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp144
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h6
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp365
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager.h10
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.cpp8
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.h6
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp7
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_test.cpp76
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_type.cpp26
-rw-r--r--src/mongo/s/request_types/merge_chunk_request_type.h8
12 files changed, 756 insertions, 26 deletions
diff --git a/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp
new file mode 100644
index 00000000000..596617a1c15
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_merge_chunk_command.cpp
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/action_type.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/auth/privilege.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/s/request_types/merge_chunk_request_type.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace {
+
+using std::string;
+
+/**
+ * Internal sharding command run on config servers to merge a set of chunks.
+ *
+ * Format:
+ * {
+ * _configsvrMergeChunk: <string namespace>,
+ * collEpoch: <OID epoch>,
+ * chunkBoundaries: [
+ * <BSONObj key1>,
+ * <BSONObj key2>,
+ * ...
+ * ],
+ * shard: <string shard>,
+ * writeConcern: <BSONObj>
+ * }
+ */
+class ConfigSvrMergeChunkCommand : public Command {
+public:
+ ConfigSvrMergeChunkCommand() : Command("_configsvrMergeChunk") {}
+
+ void help(std::stringstream& help) const override {
+ help << "Internal command, which is sent by a shard to the sharding config server. Do "
+ "not call directly. Receives, validates, and processes a MergeChunkRequest"
+ }
+
+ bool slaveOk() const override {
+ return false;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return true;
+ }
+
+ Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
+ if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
+ ResourcePattern::forClusterResource(), ActionType::internal)) {
+ return Status(ErrorCodes::Unauthorized, "Unauthorized");
+ }
+ return Status::OK();
+ }
+
+ std::string parseNs(const std::string& dbname, const BVSONObj& cmdObj) const override {
+ return parseNsFullyQualified(dbname, cmdObj);
+ }
+
+ bool run(OperationContext* txn,
+ const std::string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
+ if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
+ uasserted(ErrorCodes::IllegalOperation,
+ "_configsvrMergeChunk can only be run on config servers");
+ }
+
+ auto parsedRequest = uassertStatusOK(MergeChunkRequest::parseFromConfigCommand(cmdObj));
+
+ Status mergeChunkResult =
+ Grid::get(txn)->catalogManager()->commitChunkMerge(txn,
+ parsedRequest.getNamespace(),
+ parsedRequest.getEpoch(),
+ parsedRequest.getChunkBoundaries(),
+ parsedRequest.getShardName());
+
+ uassertStatusOK(mergeChunkResult);
+
+ return true;
+ }
+} configsvrMergeChunkCmd;
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index 9296788c94d..bc50301fc6c 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -113,6 +113,7 @@ env.CppUnitTest(
'sharding_catalog_remove_shard_from_zone_test.cpp',
'sharding_catalog_config_initialization_test.cpp',
'sharding_catalog_split_chunk_test.cpp',
+ 'sharding_catalog_merge_chunk_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/repl/replmocks',
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index dc288b6cde8..c85d0464d90 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -1014,7 +1014,7 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock);
// Acquire GlobalLock in MODE_X twice to prevent yielding.
- // GlobalLock and the following lock on config.chunks are only needed to support
+ // GLobalLock and the following lock on config.chunks are only needed to support
// mixed-mode operation with mongoses from 3.2
// TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4
Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
@@ -1158,6 +1158,148 @@ Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
return applyOpsStatus;
}
+Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const std::vector<BSONObj>& chunkBoundaries,
+ const std::string& shardName) {
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations
+ // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/
+ // move chunks on different collections to proceed in parallel
+ Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock);
+
+ // Acquire GlobalLock in MODE_X twice to prevent yielding.
+ // GLobalLock and the following lock on config.chunks are only needed to support
+ // mixed-mode operation with mongoses from 3.2
+ // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4
+ Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
+ Lock::GlobalLock secondGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
+
+ // Acquire lock on config.chunks in MODE_X
+ AutoGetCollection autoColl(txn, NamespaceString(ChunkType::ConfigNS), MODE_X);
+
+ // Get the chunk with the highest version for this namespace
+ auto findStatus = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON("ns" << ns.ns()),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ 1);
+
+ if (!findStatus.isOK()) {
+ return findStatus.getStatus();
+ }
+
+ const auto& chunksVector = findStatus.getValue().docs;
+ if (chunksVector.empty())
+ return {ErrorCodes::IllegalOperation,
+ "collection does not exist, isn't sharded, or has no chunks"};
+
+ ChunkVersion collVersion =
+ ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod());
+
+ // Return an error if epoch of chunk does not match epoch of request
+ if (collVersion.epoch() != requestEpoch) {
+ return {ErrorCodes::StaleEpoch,
+ "epoch of chunk does not match epoch of request. This most likely means "
+ "that the collection was dropped and re-created."};
+ }
+
+ // Build chunks to be merged
+ std::vector<ChunkType> chunksToMerge;
+
+ ChunkType itChunk;
+ itChunk.setMax(chunkBoundaries.front());
+ itChunk.setNS(ns.ns());
+ itChunk.setShard(shardName);
+
+ // Do not use the first chunk boundary as a max bound while building chunks
+ for (size_t i = 1; i < chunkBoundaries.size(); ++i) {
+ itChunk.setMin(itChunk.getMax());
+ itChunk.setMax(chunkBoundaries[i]);
+ chunksToMerge.push_back(itChunk);
+ }
+
+ ChunkVersion mergeVersion = collVersion;
+ mergeVersion.incMinor();
+
+ BSONArrayBuilder updates;
+
+ // Build an update operation to expand the first chunk into the newly merged chunk
+ {
+ BSONObjBuilder op;
+ op.append("op", "u");
+ op.appendBool("b", false);
+ op.append("ns", ChunkType::ConfigNS);
+
+ // expand first chunk into newly merged chunk
+ ChunkType mergedChunk(chunksToMerge.front());
+ mergedChunk.setMax(chunksToMerge.back().getMax());
+
+ // fill in additional details for sending through applyOps
+ mergedChunk.setVersion(mergeVersion);
+
+ // add the new chunk information as the update object
+ op.append("o", mergedChunk.toBSON());
+
+ // query object
+ op.append("o2", BSON(ChunkType::name(mergedChunk.getName())));
+
+ updates.append(op.obj());
+ }
+
+ // Build update operations to delete the rest of the chunks to be merged. Remember not
+ // to delete the first chunk we're expanding
+ for (size_t i = 1; i < chunksToMerge.size(); ++i) {
+ BSONObjBuilder op;
+ op.append("op", "d");
+ op.append("ns", ChunkType::ConfigNS);
+
+ op.append("o", BSON(ChunkType::name(chunksToMerge[i].getName())));
+
+ updates.append(op.obj());
+ }
+
+ BSONArrayBuilder preCond;
+ {
+ BSONObjBuilder b;
+ b.append("ns", ChunkType::ConfigNS);
+ b.append("q",
+ BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby"
+ << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+ {
+ BSONObjBuilder bb(b.subobjStart("res"));
+ collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod());
+ }
+ preCond.append(b.obj());
+ }
+
+ // apply the batch of updates to remote and local metadata
+ Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated(
+ txn, updates.arr(), preCond.arr(), ns.ns(), mergeVersion);
+ if (!applyOpsStatus.isOK()) {
+ return applyOpsStatus;
+ }
+
+ // log changes
+ BSONObjBuilder logDetail;
+ {
+ BSONArrayBuilder b(logDetail.subarrayStart("merged"));
+ for (auto chunkToMerge : chunksToMerge) {
+ b.append(chunkToMerge.toBSON());
+ }
+ }
+ collVersion.addToBSON(logDetail, "prevShardVersion");
+ mergeVersion.addToBSON(logDetail, "mergedVersion");
+
+ grid.catalogClient(txn)->logChange(txn, "merge", ns.ns(), logDetail.obj());
+
+ return applyOpsStatus;
+}
+
void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
_executorForAddShard->appendConnectionStats(stats);
}
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index 339e08522b0..b25c7d1bf8c 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -94,6 +94,12 @@ public:
const std::vector<BSONObj>& splitPoints,
const std::string& shardName) override;
+ Status commitChunkMerge(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const std::vector<BSONObj>& chunkBoundaries,
+ const std::string& shardName) override;
+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp
new file mode 100644
index 00000000000..c2107be7390
--- /dev/null
+++ b/src/mongo/s/catalog/replset/sharding_catalog_merge_chunk_test.cpp
@@ -0,0 +1,365 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/read_preference.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/config_server_test_fixture.h"
+
+namespace mongo {
+namespace {
+
+using MergeChunkTest = ConfigServerTestFixture;
+
+TEST_F(MergeChunkTest, MergeExistingChunksCorrectlyShouldSucceed) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ // Construct chunk to be merged
+ auto chunk2(chunk);
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkBound = BSON("a" << 5);
+ auto chunkMax = BSON("a" << 10);
+ // first chunk boundaries
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkBound);
+ // second chunk boundaries
+ chunk2.setMin(chunkBound);
+ chunk2.setMax(chunkMax);
+
+ std::vector<BSONObj> chunkBoundaries{chunkMin, chunkBound, chunkMax};
+
+ setupChunks({chunk, chunk2});
+
+ ASSERT_OK(catalogManager()->commitChunkMerge(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ origVersion.epoch(),
+ chunkBoundaries,
+ "shard0000"));
+
+ auto findResponse = uassertStatusOK(
+ getConfigShard()->exhaustiveFindOnConfig(operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << "TestDB.TestColl"),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ boost::none));
+
+ const auto& chunksVector = findResponse.docs;
+
+ // There should be exactly one chunk left in the collection
+ ASSERT_EQ(1u, chunksVector.size());
+
+ // MergedChunk should have range [chunkMin, chunkMax]
+ auto mergedChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front()));
+ ASSERT_EQ(chunkMin, mergedChunk.getMin());
+ ASSERT_EQ(chunkMax, mergedChunk.getMax());
+
+ {
+ // Check for increment on mergedChunk's minor version
+ ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
+ }
+}
+
+TEST_F(MergeChunkTest, MergeSeveralChunksCorrectlyShouldSucceed) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ // Construct chunks to be merged
+ auto chunk2(chunk);
+ auto chunk3(chunk);
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkBound = BSON("a" << 5);
+ auto chunkBound2 = BSON("a" << 7);
+ auto chunkMax = BSON("a" << 10);
+ // first chunk boundaries
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkBound);
+ // second chunk boundaries
+ chunk2.setMin(chunkBound);
+ chunk2.setMax(chunkBound2);
+ // third chunk boundaries
+ chunk3.setMin(chunkBound2);
+ chunk3.setMax(chunkMax);
+
+ // Record chunk boundaries for passing into commitChunkMerge
+ std::vector<BSONObj> chunkBoundaries{chunkMin, chunkBound, chunkBound2, chunkMax};
+
+ setupChunks({chunk, chunk2, chunk3});
+
+ ASSERT_OK(catalogManager()->commitChunkMerge(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ origVersion.epoch(),
+ chunkBoundaries,
+ "shard0000"));
+
+ auto findResponse = uassertStatusOK(
+ getConfigShard()->exhaustiveFindOnConfig(operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << "TestDB.TestColl"),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ boost::none));
+
+ const auto& chunksVector = findResponse.docs;
+
+ // There should be exactly one chunk left in the collection
+ ASSERT_EQ(1u, chunksVector.size());
+
+ // MergedChunk should have range [chunkMin, chunkMax]
+ auto mergedChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front()));
+ ASSERT_EQ(chunkMin, mergedChunk.getMin());
+ ASSERT_EQ(chunkMax, mergedChunk.getMax());
+
+ {
+ // Check for increment on mergedChunk's minor version
+ ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
+ }
+}
+
+TEST_F(MergeChunkTest, NewMergeShouldClaimHighestVersion) {
+ ChunkType chunk, otherChunk;
+ chunk.setNS("TestDB.TestColl");
+ otherChunk.setNS("TestDB.TestColl");
+ auto collEpoch = OID::gen();
+
+ auto origVersion = ChunkVersion(1, 2, collEpoch);
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ // Construct chunk to be merged
+ auto chunk2(chunk);
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkBound = BSON("a" << 5);
+ auto chunkMax = BSON("a" << 10);
+ // first chunk boundaries
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkBound);
+ // second chunk boundaries
+ chunk2.setMin(chunkBound);
+ chunk2.setMax(chunkMax);
+
+ // Record chunk boundaries for passing into commitChunkMerge
+ std::vector<BSONObj> chunkBoundaries{chunkMin, chunkBound, chunkMax};
+
+ // Set up other chunk with competing version
+ auto competingVersion = ChunkVersion(2, 1, collEpoch);
+ otherChunk.setVersion(competingVersion);
+ otherChunk.setShard(ShardId("shard0000"));
+ otherChunk.setMin(BSON("a" << 10));
+ otherChunk.setMax(BSON("a" << 20));
+
+ setupChunks({chunk, chunk2, otherChunk});
+
+ ASSERT_OK(catalogManager()->commitChunkMerge(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ collEpoch,
+ chunkBoundaries,
+ "shard0000"));
+
+ auto findResponse = uassertStatusOK(
+ getConfigShard()->exhaustiveFindOnConfig(operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << "TestDB.TestColl"),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ boost::none));
+
+ const auto& chunksVector = findResponse.docs;
+
+ // There should be exactly two chunks left in the collection: one merged, one competing
+ ASSERT_EQ(2u, chunksVector.size());
+
+ // MergedChunk should have range [chunkMin, chunkMax]
+ auto mergedChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front()));
+ ASSERT_EQ(chunkMin, mergedChunk.getMin());
+ ASSERT_EQ(chunkMax, mergedChunk.getMax());
+
+ {
+ // Check for minor increment on collection version
+ ASSERT_EQ(competingVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
+ ASSERT_EQ(competingVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
+ }
+}
+
+TEST_F(MergeChunkTest, MergeLeavesOtherChunksAlone) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 2, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ // Construct chunk to be merged
+ auto chunk2(chunk);
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkBound = BSON("a" << 5);
+ auto chunkMax = BSON("a" << 10);
+ // first chunk boundaries
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkBound);
+ // second chunk boundaries
+ chunk2.setMin(chunkBound);
+ chunk2.setMax(chunkMax);
+
+ // Record chunk boundaries for passing into commitChunkMerge
+ std::vector<BSONObj> chunkBoundaries{chunkMin, chunkBound, chunkMax};
+
+ // Set up unmerged chunk
+ auto otherChunk(chunk);
+ otherChunk.setMin(BSON("a" << 10));
+ otherChunk.setMax(BSON("a" << 20));
+
+ setupChunks({chunk, chunk2, otherChunk});
+
+ ASSERT_OK(catalogManager()->commitChunkMerge(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ origVersion.epoch(),
+ chunkBoundaries,
+ "shard0000"));
+
+ auto findResponse = uassertStatusOK(
+ getConfigShard()->exhaustiveFindOnConfig(operationContext(),
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << "TestDB.TestColl"),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ boost::none));
+
+ const auto& chunksVector = findResponse.docs;
+
+ // There should be exactly two chunks left in the collection: one merged, one untouched
+ ASSERT_EQ(2u, chunksVector.size());
+
+ // MergedChunk should have range [chunkMin, chunkMax]
+ auto mergedChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.front()));
+ ASSERT_EQ(chunkMin, mergedChunk.getMin());
+ ASSERT_EQ(chunkMax, mergedChunk.getMax());
+
+ {
+ // Check for increment on mergedChunk's minor version
+ ASSERT_EQ(origVersion.majorVersion(), mergedChunk.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 1, mergedChunk.getVersion().minorVersion());
+ }
+
+ // OtherChunk should have been left alone
+ auto foundOtherChunk = uassertStatusOK(ChunkType::fromBSON(chunksVector.back()));
+ ASSERT_EQ(otherChunk.getMin(), foundOtherChunk.getMin());
+ ASSERT_EQ(otherChunk.getMax(), foundOtherChunk.getMax());
+}
+
+TEST_F(MergeChunkTest, NonExistingNamespace) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ // Construct chunk to be merged
+ auto chunk2(chunk);
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkBound = BSON("a" << 5);
+ auto chunkMax = BSON("a" << 10);
+ // first chunk boundaries
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkBound);
+ chunk2.setMin(chunkBound);
+ chunk2.setMax(chunkMax);
+
+ // Record chunk boundaries for passing into commitChunkMerge
+ std::vector<BSONObj> chunkBoundaries{chunkMin, chunkBound, chunkMax};
+
+ setupChunks({chunk, chunk2});
+
+ auto mergeStatus = catalogManager()->commitChunkMerge(operationContext(),
+ NamespaceString("TestDB.NonExistingColl"),
+ origVersion.epoch(),
+ chunkBoundaries,
+ "shard0000");
+ ASSERT_EQ(ErrorCodes::IllegalOperation, mergeStatus);
+}
+
+TEST_F(MergeChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ // Construct chunk to be merged
+ auto chunk2(chunk);
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkBound = BSON("a" << 5);
+ auto chunkMax = BSON("a" << 10);
+ // first chunk boundaries
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkBound);
+ chunk2.setMin(chunkBound);
+ chunk2.setMax(chunkMax);
+
+ // Record chunk baoundaries for passing into commitChunkMerge
+ std::vector<BSONObj> chunkBoundaries{chunkMin, chunkBound, chunkMax};
+
+ setupChunks({chunk, chunk2});
+
+ auto mergeStatus = catalogManager()->commitChunkMerge(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ OID::gen(),
+ chunkBoundaries,
+ "shard0000");
+ ASSERT_EQ(ErrorCodes::StaleEpoch, mergeStatus);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h
index 58c39631fd6..a478d55f2d8 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager.h
@@ -149,6 +149,16 @@ public:
const std::string& shardName) = 0;
/**
+ * Updates chunk metadata in config.chunks collection to reflect the given chunks being merged
+ * into a single larger chunk based on the specified boundaries of the smaller chunks.
+ */
+ virtual Status commitChunkMerge(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const std::vector<BSONObj>& chunkBoundaries,
+ const std::string& shardName) = 0;
+
+ /**
* Append information about the connection pools owned by the CatalogManager.
*/
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0;
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
index 57acb99abae..577c179cd49 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
@@ -90,6 +90,14 @@ Status ShardingCatalogManagerMock::commitChunkSplit(OperationContext* txn,
return {ErrorCodes::InternalError, "Method not implemented"};
}
+Status ShardingCatalogManagerMock::commitChunkMerge(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const std::vector<BSONObj>& chunkBoundaries,
+ const std::string& shardName) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {}
Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationContext* txn) {
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
index 7c346bc3e91..1b49be01afe 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
@@ -76,6 +76,12 @@ public:
const std::vector<BSONObj>& splitPoints,
const std::string& shardName) override;
+ Status commitChunkMerge(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const std::vector<BSONObj>& chunkBoundaries,
+ const std::string& shardName) override;
+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index e769876bdd9..546a1d43847 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -425,11 +425,10 @@ Status ConfigServerTestFixture::setupChunks(const std::vector<ChunkType>& chunks
StatusWith<ChunkType> ConfigServerTestFixture::getChunkDoc(OperationContext* txn,
const BSONObj& minKey) {
- auto doc =
- findOneOnConfigCollection(txn, NamespaceString(ChunkType::ConfigNS), BSON("min" << minKey));
- if (!doc.isOK()) {
+ auto doc = findOneOnConfigCollection(
+ txn, NamespaceString(ChunkType::ConfigNS), BSON(ChunkType::min() << minKey));
+ if (!doc.isOK())
return doc.getStatus();
- }
return ChunkType::fromBSON(doc.getValue());
}
diff --git a/src/mongo/s/request_types/merge_chunk_request_test.cpp b/src/mongo/s/request_types/merge_chunk_request_test.cpp
index a3610fbe5e0..2bca08c5fba 100644
--- a/src/mongo/s/request_types/merge_chunk_request_test.cpp
+++ b/src/mongo/s/request_types/merge_chunk_request_test.cpp
@@ -46,12 +46,15 @@ TEST(MergeChunkRequest, BasicValidConfigCommand) {
<< "collEpoch"
<< OID("7fffffff0000000000000001")
<< "chunkBoundaries"
- << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)))));
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
+ << "shard"
+ << "shard0000")));
ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace());
ASSERT_EQ(OID("7fffffff0000000000000001"), request.getEpoch());
ASSERT_EQ(BSON("a" << 1), request.getChunkBoundaries().at(0));
ASSERT_EQ(BSON("a" << 5), request.getChunkBoundaries().at(1));
ASSERT_EQ(BSON("a" << 10), request.getChunkBoundaries().at(2));
+ ASSERT_EQ("shard0000", request.getShardName());
}
TEST(MergeChunkRequest, ConfigCommandtoBSON) {
@@ -61,7 +64,9 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) {
<< "collEpoch"
<< OID("7fffffff0000000000000001")
<< "chunkBoundaries"
- << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)));
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
+ << "shard"
+ << "shard0000");
BSONObj writeConcernObj = BSON("writeConcern" << BSON("w"
<< "majority"));
@@ -80,7 +85,9 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) {
TEST(MergeChunkRequest, MissingNameSpaceErrors) {
auto request = MergeChunkRequest::parseFromConfigCommand(
BSON("collEpoch" << OID("7fffffff0000000000000001") << "chunkBoundaries"
- << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))));
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
}
@@ -89,16 +96,30 @@ TEST(MergeChunkRequest, MissingCollEpochErrors) {
BSON("_configsvrMergeChunk"
<< "TestDB.TestColl"
<< "chunkBoundaries"
- << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))));
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
}
TEST(MergeChunkRequest, MissingChunkBoundariesErrors) {
- auto request =
- MergeChunkRequest::parseFromConfigCommand(BSON("_configsvrMergeChunk"
- << "TestDB.TestColl"
- << "collEpoch"
- << OID("7fffffff0000000000000001")));
+ auto request = MergeChunkRequest::parseFromConfigCommand(BSON("_configsvrMergeChunk"
+ << "TestDB.TestColl"
+ << "collEpoch"
+ << OID("7fffffff0000000000000001")
+ << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
+}
+
+TEST(MergeChunkRequest, MissingShardNameErrors) {
+ auto request = MergeChunkRequest::parseFromConfigCommand(
+ BSON("_configsvrMergeChunk"
+ << "TestDB.TestColl"
+ << "collEpoch"
+ << OID("7fffffff0000000000000001")
+ << "chunkBoundaries"
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))));
ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
}
@@ -106,7 +127,9 @@ TEST(MergeChunkRequest, WrongNamespaceTypeErrors) {
auto request = MergeChunkRequest::parseFromConfigCommand(BSON(
"_configsvrMergeChunk" << 1234 << "collEpoch" << OID("7fffffff0000000000000001")
<< "chunkBoundaries"
- << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))));
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
}
@@ -117,7 +140,9 @@ TEST(MergeChunkRequest, WrongCollEpochTypeErrors) {
<< "collEpoch"
<< 1234
<< "chunkBoundaries"
- << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))));
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
}
@@ -127,7 +152,22 @@ TEST(MergeChunkRequest, WrongChunkBoundariesTypeErrors) {
<< "collEpoch"
<< OID("7fffffff0000000000000001")
<< "chunkBoundaries"
- << 1234));
+ << 1234
+ << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
+}
+
+TEST(MergeChunkRequest, WrongShardNameTypeErrors) {
+ auto request = MergeChunkRequest::parseFromConfigCommand(
+ BSON("_configsvrMergeChunk"
+ << "TestDB.TestColl"
+ << "collEpoch"
+ << OID("7fffffff0000000000000001")
+ << "chunkBoundaries"
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
+ << "shard"
+ << 1234));
ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
}
@@ -138,7 +178,9 @@ TEST(MergeChunkRequest, InvalidNamespaceErrors) {
<< "collEpoch"
<< OID("7fffffff0000000000000001")
<< "chunkBoundaries"
- << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))));
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::InvalidNamespace, request.getStatus());
}
@@ -148,7 +190,9 @@ TEST(MergeChunkRequest, EmptyChunkBoundariesErrors) {
<< "collEpoch"
<< OID("7fffffff0000000000000001")
<< "chunkBoundaries"
- << BSONArray()));
+ << BSONArray()
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::InvalidOptions, request.getStatus());
}
@@ -159,7 +203,9 @@ TEST(MergeChunkRequest, TooFewChunkBoundariesErrors) {
<< "collEpoch"
<< OID("7fffffff0000000000000001")
<< "chunkBoundaries"
- << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 10))));
+ << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 10))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::InvalidOptions, request.getStatus());
}
}
diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp
index 75f19e7f610..508bf9b247d 100644
--- a/src/mongo/s/request_types/merge_chunk_request_type.cpp
+++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp
@@ -35,6 +35,7 @@
namespace mongo {
+using std::string;
using std::vector;
namespace {
@@ -42,18 +43,21 @@ namespace {
const char kConfigsvrMergeChunk[] = "_configsvrMergeChunk";
const char kCollEpoch[] = "collEpoch";
const char kChunkBoundaries[] = "chunkBoundaries";
+const char kShardName[] = "shard";
} // unnamed namespace
MergeChunkRequest::MergeChunkRequest(NamespaceString nss,
OID epoch,
- vector<BSONObj> chunkBoundaries)
+ vector<BSONObj> chunkBoundaries,
+ string shardName)
: _nss(std::move(nss)),
_epoch(std::move(epoch)),
- _chunkBoundaries(std::move(chunkBoundaries)) {}
+ _chunkBoundaries(std::move(chunkBoundaries)),
+ _shardName(std::move(shardName)) {}
StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BSONObj& cmdObj) {
- std::string ns;
+ string ns;
auto parseNamespaceStatus = bsonExtractStringField(cmdObj, kConfigsvrMergeChunk, &ns);
if (!parseNamespaceStatus.isOK()) {
@@ -82,8 +86,15 @@ StatusWith<MergeChunkRequest> MergeChunkRequest::parseFromConfigCommand(const BS
}
}
- auto request =
- MergeChunkRequest(NamespaceString(ns), std::move(epoch), std::move(chunkBoundaries));
+ string shardName;
+ auto parseShardNameStatus = bsonExtractStringField(cmdObj, kShardName, &shardName);
+
+ if (!parseShardNameStatus.isOK()) {
+ return parseShardNameStatus;
+ }
+
+ auto request = MergeChunkRequest(
+ NamespaceString(ns), std::move(epoch), std::move(chunkBoundaries), std::move(shardName));
Status validationStatus = request._validate();
if (!validationStatus.isOK()) {
return validationStatus;
@@ -111,6 +122,7 @@ void MergeChunkRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) {
chunkBoundariesArray.append(chunkBoundary);
}
}
+ cmdBuilder->append(kShardName, _shardName);
}
const NamespaceString& MergeChunkRequest::getNamespace() const {
@@ -125,6 +137,10 @@ const vector<BSONObj>& MergeChunkRequest::getChunkBoundaries() const {
return _chunkBoundaries;
}
+const string& MergeChunkRequest::getShardName() const {
+ return _shardName;
+}
+
Status MergeChunkRequest::_validate() {
if (!getNamespace().isValid()) {
return Status(ErrorCodes::InvalidNamespace,
diff --git a/src/mongo/s/request_types/merge_chunk_request_type.h b/src/mongo/s/request_types/merge_chunk_request_type.h
index 8be781f3626..5aa72ab6356 100644
--- a/src/mongo/s/request_types/merge_chunk_request_type.h
+++ b/src/mongo/s/request_types/merge_chunk_request_type.h
@@ -43,7 +43,10 @@ namespace mongo {
*/
class MergeChunkRequest {
public:
- MergeChunkRequest(NamespaceString nss, OID epoch, std::vector<BSONObj> chunkBoundaries);
+ MergeChunkRequest(NamespaceString nss,
+ OID epoch,
+ std::vector<BSONObj> chunkBoundaries,
+ std::string shardName);
/**
* Parses the provided BSON content as the internal _configsvrMergeChunk command, and if
@@ -57,6 +60,7 @@ public:
* <BSONObj key2>,
* ...
* ],
+ * shard: <string shard>
* }
*/
static StatusWith<MergeChunkRequest> parseFromConfigCommand(const BSONObj& cmdObj);
@@ -77,6 +81,7 @@ public:
const NamespaceString& getNamespace() const;
const OID& getEpoch() const;
const std::vector<BSONObj>& getChunkBoundaries() const;
+ const std::string& getShardName() const;
private:
/**
@@ -93,6 +98,7 @@ private:
* Specifies the boundaries of the chunks to be merged.
*/
std::vector<BSONObj> _chunkBoundaries;
+ std::string _shardName;
};
} // namespace mongo