diff options
author | Jess Fan <jess.fan@10gen.com> | 2016-07-22 10:27:20 -0400 |
---|---|---|
committer | Jess Fan <jess.fan@10gen.com> | 2016-08-10 16:03:24 -0400 |
commit | e38918afa79bbb952e82a5127cb36d85f919703a (patch) | |
tree | 8b5c80383f716d0a481d4955c2eb5ae7dbb04720 | |
parent | 3ffcc4b4a105ccacb6c8d8863d44f88338395b3a (diff) | |
download | mongo-e38918afa79bbb952e82a5127cb36d85f919703a.tar.gz |
SERVER-24999 Built _configsvrSplitChunk wrapper around applyOps
-rw-r--r-- | src/mongo/db/catalog/apply_ops.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/config/configsvr_split_chunk_command.cpp | 123 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp | 210 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h | 27 | ||||
-rw-r--r-- | src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp | 262 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager.h | 11 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_mock.cpp | 10 | ||||
-rw-r--r-- | src/mongo/s/catalog/sharding_catalog_manager_mock.h | 7 | ||||
-rw-r--r-- | src/mongo/s/config_server_test_fixture.cpp | 23 | ||||
-rw-r--r-- | src/mongo/s/config_server_test_fixture.h | 11 | ||||
-rw-r--r-- | src/mongo/s/request_types/split_chunk_request_test.cpp | 86 | ||||
-rw-r--r-- | src/mongo/s/request_types/split_chunk_request_type.cpp | 25 | ||||
-rw-r--r-- | src/mongo/s/request_types/split_chunk_request_type.h | 8 |
14 files changed, 770 insertions, 48 deletions
diff --git a/src/mongo/db/catalog/apply_ops.cpp b/src/mongo/db/catalog/apply_ops.cpp index 267dd013324..28be87cf6de 100644 --- a/src/mongo/db/catalog/apply_ops.cpp +++ b/src/mongo/db/catalog/apply_ops.cpp @@ -212,18 +212,20 @@ Status _applyOps(OperationContext* txn, const BSONObj cmdRewritten = cmdBuilder.done(); + auto opObserver = getGlobalServiceContext()->getOpObserver(); + invariant(opObserver); if (haveWrappingWUOW) { - getGlobalServiceContext()->getOpObserver()->onApplyOps(txn, tempNS, cmdRewritten); + opObserver->onApplyOps(txn, tempNS, cmdRewritten); } else { // When executing applyOps outside of a wrapping WriteUnitOfWOrk, always logOp the - // command regardless of whether the individial ops succeeded and rely on any failures - // to also on secondaries. This isn't perfect, but it's what the command has always done - // and is part of its "correct" behavior. + // command regardless of whether the individial ops succeeded and rely on any + // failures to also on secondaries. This isn't perfect, but it's what the command + // has always done and is part of its "correct" behavior. while (true) { try { WriteUnitOfWork wunit(txn); - getGlobalServiceContext()->getOpObserver()->onApplyOps( - txn, tempNS, cmdRewritten); + opObserver->onApplyOps(txn, tempNS, cmdRewritten); + wunit.commit(); break; } catch (const WriteConflictException& wce) { diff --git a/src/mongo/db/s/config/configsvr_split_chunk_command.cpp b/src/mongo/db/s/config/configsvr_split_chunk_command.cpp new file mode 100644 index 00000000000..0799323550d --- /dev/null +++ b/src/mongo/db/s/config/configsvr_split_chunk_command.cpp @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/action_type.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/privilege.h" +#include "mongo/db/commands.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/request_types/split_chunk_request_type.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace { + +using std::string; + +/** + * Internal sharding command run on config servers to split a chunk. + * + * Format: + * { + * _configsvrSplitChunk: <string namespace>, + * collEpoch: <OID epoch>, + * min: <BSONObj chunkToSplitMin>, + * max: <BSONObj chunkToSplitMax>, + * splitPoints: [<BSONObj key>, ...], + * shard: <string shard>, + * writeConcern: <BSONObj> + * } + */ +class ConfigSvrSplitChunkCommand : public Command { +public: + ConfigSvrSplitChunkCommand() : Command("_configsvrSplitChunk") {} + + void help(std::stringstream& help) const override { + help << "Internal command, which is sent by a shard to the sharding config server. Do " + "not call directly. Receives, validates, and processes a SplitChunkRequest." + } + + bool slaveOk() const override { + return false; + } + + bool adminOnly() const override { + return true; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return true; + } + + Status checkAuthForCommand(ClientBasic* client, + const std::string& dbname, + const BSONObj& cmdObj) override { + if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource( + ResourcePattern::forClusterResource(), ActionType::internal)) { + return Status(ErrorCodes::Unauthorized, "Unauthorized"); + } + return Status::OK(); + } + + std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override { + return parseNsFullyQualified(dbname, cmdObj); + } + + bool run(OperationContext* txn, + const std::string& dbName, + BSONObj& cmdObj, + int options, + std::string& errmsg, + BSONObjBuilder& result) override { + if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { + uasserted(ErrorCodes::IllegalOperation, + "_configsvrSplitChunk can only be run on config servers"); + } + + auto parsedRequest = uassertStatusOK(SplitChunkRequest::parseFromConfigCommand(cmdObj)); + + Status splitChunkResult = + Grid::get(txn)->catalogManager()->commitChunkSplit(txn, + parsedRequest.getNamespace(), + parsedRequest.getEpoch(), + parsedRequest.getChunkRange(), + parsedRequest.getSplitPoints(), + parsedRequest.getShardName()); + uassertStatusOK(splitChunkResult); + + return true; + } +} configsvrSplitChunkCmd; +} // namespace +} // namespace mongo diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript index 6c8cc788e8d..9296788c94d 100644 --- a/src/mongo/s/catalog/replset/SConscript +++ b/src/mongo/s/catalog/replset/SConscript @@ -112,6 +112,7 @@ env.CppUnitTest( 'sharding_catalog_assign_key_range_to_zone_test.cpp', 'sharding_catalog_remove_shard_from_zone_test.cpp', 'sharding_catalog_config_initialization_test.cpp', + 'sharding_catalog_split_chunk_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/repl/replmocks', diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index ee81e49eb6f..dc288b6cde8 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -43,10 +43,12 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" -#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/commands.h" +#include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" @@ -54,7 +56,6 @@ #include "mongo/s/balancer/balancer_policy.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/sharding_catalog_client.h" -#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_config_version.h" #include "mongo/s/catalog/type_database.h" @@ -103,12 +104,16 @@ const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode:: Lock::ResourceMutex kShardMembershipLock; /** - * Lock for shard zoning operations. This should be acquired when doing any operations that - * can affect the config.tags collection or the tags field of the config.shards collection. - * No other locks should be held when locking this. If an operation needs to take database locks - * (for example to write to a local collection) those locks should be taken after taking this. - */ -Lock::ResourceMutex kZoneOpLock; + * Append min, max and version information from chunk to the buffer for logChange purposes. +*/ +void appendShortVersion(BufBuilder* b, const ChunkType& chunk) { + BSONObjBuilder bb(*b); + bb.append(ChunkType::min(), chunk.getMin()); + bb.append(ChunkType::max(), chunk.getMax()); + if (chunk.isVersionSet()) + chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod()); + bb.done(); +} /** * Checks if the given key range for the given namespace conflicts with an existing key range. @@ -636,7 +641,7 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( } // Only one addShard operation can be in progress at a time. - Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock); + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); // TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead. const std::shared_ptr<Shard> shard{ @@ -777,7 +782,7 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard( Status ShardingCatalogManagerImpl::addShardToZone(OperationContext* txn, const std::string& shardName, const std::string& zoneName) { - Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock); + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); auto updateStatus = _catalogClient->updateConfigDocument( txn, @@ -802,7 +807,7 @@ Status ShardingCatalogManagerImpl::addShardToZone(OperationContext* txn, Status ShardingCatalogManagerImpl::removeShardFromZone(OperationContext* txn, const std::string& shardName, const std::string& zoneName) { - Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock); + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard(); const NamespaceString shardNS(ShardType::ConfigNS); @@ -915,7 +920,7 @@ Status ShardingCatalogManagerImpl::assignKeyRangeToZone(OperationContext* txn, const NamespaceString& ns, const ChunkRange& givenRange, const string& zoneName) { - Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock); + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); auto configServer = Grid::get(txn)->shardRegistry()->getConfigShard(); @@ -977,7 +982,7 @@ Status ShardingCatalogManagerImpl::assignKeyRangeToZone(OperationContext* txn, Status ShardingCatalogManagerImpl::removeKeyRangeFromZone(OperationContext* txn, const NamespaceString& ns, const ChunkRange& range) { - Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock); + Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock); auto configServer = Grid::get(txn)->shardRegistry()->getConfigShard(); @@ -996,6 +1001,163 @@ Status ShardingCatalogManagerImpl::removeKeyRangeFromZone(OperationContext* txn, txn, TagsType::ConfigNS, removeBuilder.obj(), kNoWaitWriteConcern); } +Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const ChunkRange& range, + const std::vector<BSONObj>& splitPoints, + const std::string& shardName) { + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/ + // move chunks on different collections to proceed in parallel + Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock); + + // Acquire GlobalLock in MODE_X twice to prevent yielding. + // GlobalLock and the following lock on config.chunks are only needed to support + // mixed-mode operation with mongoses from 3.2 + // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4 + Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX); + Lock::GlobalLock secondGlobalLock(txn->lockState(), MODE_X, UINT_MAX); + + // Acquire lock on config.chunks in MODE_X + AutoGetCollection autoColl(txn, NamespaceString(ChunkType::ConfigNS), MODE_X); + + // Get the chunk with highest version for this namespace + auto findStatus = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + txn, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + NamespaceString(ChunkType::ConfigNS), + BSON("ns" << ns.ns()), + BSON(ChunkType::DEPRECATED_lastmod << -1), + 1); + + if (!findStatus.isOK()) { + return findStatus.getStatus(); + } + + const auto& chunksVector = findStatus.getValue().docs; + if (chunksVector.empty()) + return {ErrorCodes::IllegalOperation, + "collection does not exist, isn't sharded, or has no chunks"}; + + ChunkVersion collVersion = + ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod()); + + // Return an error if epoch of chunk does not match epoch of request + if (collVersion.epoch() != requestEpoch) { + return {ErrorCodes::StaleEpoch, + "epoch of chunk does not match epoch of request. This most likely means " + "that the collection was dropped and re-created."}; + } + + std::vector<ChunkType> newChunks; + + auto newChunkBounds(splitPoints); + ChunkVersion currentMaxVersion = collVersion; + auto startKey = range.getMin(); + newChunkBounds.push_back( + range.getMax()); // makes it easier to have 'max' in the next loop. remove later. + + BSONArrayBuilder updates; + + for (const auto& endKey : newChunkBounds) { + // splits only update the 'minor' portion of version + currentMaxVersion.incMinor(); + + // build an update operation against the chunks collection of the config database + // with + // upsert true + BSONObjBuilder op; + op.append("op", "u"); + op.appendBool("b", true); + op.append("ns", ChunkType::ConfigNS); + + // add the modified (new) chunk information as the update object + BSONObjBuilder n(op.subobjStart("o")); + n.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey)); + currentMaxVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod()); + n.append(ChunkType::ns(), ns.ns()); + n.append(ChunkType::min(), startKey); + n.append(ChunkType::max(), endKey); + n.append(ChunkType::shard(), shardName); + n.done(); + + // add the chunk's _id as the query part of the update statement + BSONObjBuilder q(op.subobjStart("o2")); + q.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey)); + q.done(); + + updates.append(op.obj()); + + // remember this chunk info for logging later + ChunkType chunk; + chunk.setMin(startKey); + chunk.setMax(endKey); + chunk.setVersion(currentMaxVersion); + + newChunks.push_back(std::move(chunk)); + + startKey = endKey; + } + + newChunkBounds.pop_back(); // 'max' was used as sentinel + + BSONArrayBuilder preCond; + { + BSONObjBuilder b; + b.append("ns", ChunkType::ConfigNS); + b.append("q", + BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby" + << BSON(ChunkType::DEPRECATED_lastmod() << -1))); + { + BSONObjBuilder bb(b.subobjStart("res")); + collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod()); + } + preCond.append(b.obj()); + } + + // apply the batch of updates to remote and local metadata + Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated( + txn, updates.arr(), preCond.arr(), ns.ns(), currentMaxVersion); + if (!applyOpsStatus.isOK()) { + return applyOpsStatus; + } + + // log changes + BSONObjBuilder logDetail; + { + BSONObjBuilder b(logDetail.subobjStart("before")); + b.append(ChunkType::min(), range.getMin()); + b.append(ChunkType::max(), range.getMax()); + collVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod()); + } + + if (newChunks.size() == 2) { + appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]); + appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]); + + grid.catalogClient(txn)->logChange(txn, "split", ns.ns(), logDetail.obj()); + } else { + BSONObj beforeDetailObj = logDetail.obj(); + BSONObj firstDetailObj = beforeDetailObj.getOwned(); + const int newChunksSize = newChunks.size(); + + for (int i = 0; i < newChunksSize; i++) { + BSONObjBuilder chunkDetail; + chunkDetail.appendElements(beforeDetailObj); + chunkDetail.append("number", i + 1); + chunkDetail.append("of", newChunksSize); + appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]); + + grid.catalogClient(txn)->logChange(txn, "multi-split", ns.ns(), chunkDetail.obj()); + } + } + + return applyOpsStatus; +} + void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) { _executorForAddShard->appendConnectionStats(stats); } @@ -1252,7 +1414,8 @@ void ShardingCatalogManagerImpl::cancelAddShardTaskIfNeeded(const ShardId& shard auto cbHandle = _getAddShardHandle_inlock(shardId); _executorForAddShard->cancel(cbHandle); // Untrack the handle here so that if this shard is re-added before the CallbackCanceled - // status is delivered to the callback, a new addShard task for the shard will be created. + // status is delivered to the callback, a new addShard task for the shard will be + // created. _untrackAddShardHandle_inlock(shardId); } } @@ -1290,8 +1453,8 @@ void ShardingCatalogManagerImpl::_scheduleAddShardTask( auto swHost = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly}, Milliseconds(kDefaultFindHostMaxWaitTime)); if (!swHost.isOK()) { - // A 3.2 mongos must have previously successfully communicated with hosts in this shard, so - // a failure to find a host here is probably transient, and it is safe to retry. + // A 3.2 mongos must have previously successfully communicated with hosts in this shard, + // so a failure to find a host here is probably transient, and it is safe to retry. warning() << "Failed to find host for shard " << shardType << " when trying to upsert a shardIdentity document, " << causedBy(swHost.getStatus()); @@ -1408,7 +1571,8 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( // If the command succeeded, update config.shards to mark the shard as shardAware. - // Release the _addShardHandlesMutex before updating config.shards, since it involves disk I/O. + // Release the _addShardHandlesMutex before updating config.shards, since it involves disk + // I/O. // At worst, a redundant addShard task will be scheduled by a new primary if the current // primary fails during that write. lk.unlock(); @@ -1425,12 +1589,12 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse( // scope at the end of this code block. auto txnPtr = cc().makeOperationContext(); - // Use kNoWaitWriteConcern to prevent waiting in this callback, since we don't handle a failed - // response anyway. If the write is rolled back, the new config primary will attempt to - // initialize sharding awareness on this shard again, and this update to config.shards will be - // automatically retried then. If it fails because the shard was removed through the normal - // removeShard path (so the entry in config.shards was deleted), no new addShard task will get - // scheduled on the next transition to primary. + // Use kNoWaitWriteConcern to prevent waiting in this callback, since we don't handle a + // failed response anyway. If the write is rolled back, the new config primary will attempt to + // initialize sharding awareness on this shard again, and this update to config.shards will + // be automatically retried then. If it fails because the shard was removed through the normal + // removeShard path (so the entry in config.shards was deleted), no new addShard task will + // get scheduled on the next transition to primary. auto updateStatus = _catalogClient->updateConfigDocument( txnPtr.get(), ShardType::ConfigNS, diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h index 9b261906ecf..339e08522b0 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h @@ -30,8 +30,10 @@ #include <vector> +#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/executor/task_executor.h" #include "mongo/s/catalog/sharding_catalog_manager.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" #include "mongo/stdx/mutex.h" @@ -85,6 +87,13 @@ public: const NamespaceString& ns, const ChunkRange& range) override; + Status commitChunkSplit(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const ChunkRange& range, + const std::vector<BSONObj>& splitPoints, + const std::string& shardName) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; @@ -281,6 +290,24 @@ private: // Protects the _addShardHandles map. stdx::mutex _addShardHandlesMutex; + + /** + * Lock for shard zoning operations. This should be acquired when doing any operations that + * can affect the config.tags collection or the tags field of the config.shards collection. + * No other locks should be held when locking this. If an operation needs to take database + * locks (for example to write to a local collection) those locks should be taken after + * taking this. + */ + Lock::ResourceMutex _kZoneOpLock; + + /** + * Lock for chunk split/merge/move operations. This should be acquired when doing split/merge/ + * move operations that can affect the config.chunks collection. + * No other locks should be held when locking this. If an operation needs to take database + * locks (for example to write to a local collection) those locks should be taken after + * taking this. + */ + Lock::ResourceMutex _kChunkOpLock; }; } // namespace mongo diff --git a/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp new file mode 100644 index 00000000000..3d78a9bfd16 --- /dev/null +++ b/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp @@ -0,0 +1,262 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/read_preference.h" +#include "mongo/db/namespace_string.h" +#include "mongo/s/catalog/sharding_catalog_manager.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/config_server_test_fixture.h" + +namespace mongo { +namespace { + +using SplitChunkTest = ConfigServerTestFixture; + +TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + auto chunkMin = BSON("a" << 1); + auto chunkMax = BSON("a" << 10); + chunk.setMin(chunkMin); + chunk.setMax(chunkMax); + + auto chunkSplitPoint = BSON("a" << 5); + std::vector<BSONObj> splitPoints{chunkSplitPoint}; + + setupChunks({chunk}); + + ASSERT_OK(catalogManager()->commitChunkSplit(operationContext(), + NamespaceString("TestDB.TestColl"), + origVersion.epoch(), + ChunkRange(chunkMin, chunkMax), + splitPoints, + "shard0000")); + + // First chunkDoc should have range [chunkMin, chunkSplitPoint] + auto chunkDocStatus = getChunkDoc(operationContext(), chunkMin); + ASSERT_OK(chunkDocStatus.getStatus()); + + auto chunkDoc = chunkDocStatus.getValue(); + ASSERT_EQ(chunkSplitPoint, chunkDoc.getMax()); + + // Check for increment on first chunkDoc's minor version + ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion()); + + // Second chunkDoc should have range [chunkSplitPoint, chunkMax] + auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint); + ASSERT_OK(otherChunkDocStatus.getStatus()); + + auto otherChunkDoc = otherChunkDocStatus.getValue(); + ASSERT_EQ(chunkMax, otherChunkDoc.getMax()); + + // Check for increment on second chunkDoc's minor version + ASSERT_EQ(origVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion()); +} + +TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + auto chunkMin = BSON("a" << 1); + auto chunkMax = BSON("a" << 10); + chunk.setMin(chunkMin); + chunk.setMax(chunkMax); + + auto chunkSplitPoint = BSON("a" << 5); + auto chunkSplitPoint2 = BSON("a" << 7); + std::vector<BSONObj> splitPoints{chunkSplitPoint, chunkSplitPoint2}; + + setupChunks({chunk}); + + ASSERT_OK(catalogManager()->commitChunkSplit(operationContext(), + NamespaceString("TestDB.TestColl"), + origVersion.epoch(), + ChunkRange(chunkMin, chunkMax), + splitPoints, + "shard0000")); + + // First chunkDoc should have range [chunkMin, chunkSplitPoint] + auto chunkDocStatus = getChunkDoc(operationContext(), chunkMin); + ASSERT_OK(chunkDocStatus.getStatus()); + + auto chunkDoc = chunkDocStatus.getValue(); + ASSERT_EQ(chunkSplitPoint, chunkDoc.getMax()); + + // Check for increment on first chunkDoc's minor version + ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion()); + + // Second chunkDoc should have range [chunkSplitPoint, chunkSplitPoint2] + auto midChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint); + ASSERT_OK(midChunkDocStatus.getStatus()); + + auto midChunkDoc = midChunkDocStatus.getValue(); + ASSERT_EQ(chunkSplitPoint2, midChunkDoc.getMax()); + + // Check for increment on second chunkDoc's minor version + ASSERT_EQ(origVersion.majorVersion(), midChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 2, midChunkDoc.getVersion().minorVersion()); + + // Third chunkDoc should have range [chunkSplitPoint2, chunkMax] + auto lastChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint2); + ASSERT_OK(lastChunkDocStatus.getStatus()); + + auto lastChunkDoc = lastChunkDocStatus.getValue(); + ASSERT_EQ(chunkMax, lastChunkDoc.getMax()); + + { + // Check for increment on third chunkDoc's minor version + ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion()); + } +} + +TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) { + ChunkType chunk, chunk2; + chunk.setNS("TestDB.TestColl"); + chunk2.setNS("TestDB.TestColl"); + auto collEpoch = OID::gen(); + + // set up first chunk + auto origVersion = ChunkVersion(1, 2, collEpoch); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + auto chunkMin = BSON("a" << 1); + auto chunkMax = BSON("a" << 10); + chunk.setMin(chunkMin); + chunk.setMax(chunkMax); + + auto chunkSplitPoint = BSON("a" << 5); + std::vector<BSONObj> splitPoints{chunkSplitPoint}; + + // set up second chunk (chunk2) + auto competingVersion = ChunkVersion(2, 1, collEpoch); + chunk2.setVersion(competingVersion); + chunk2.setShard(ShardId("shard0000")); + chunk2.setMin(BSON("a" << 10)); + chunk2.setMax(BSON("a" << 20)); + + setupChunks({chunk, chunk2}); + + ASSERT_OK(catalogManager()->commitChunkSplit(operationContext(), + NamespaceString("TestDB.TestColl"), + collEpoch, + ChunkRange(chunkMin, chunkMax), + splitPoints, + "shard0000")); + + // First chunkDoc should have range [chunkMin, chunkSplitPoint] + auto chunkDocStatus = getChunkDoc(operationContext(), chunkMin); + ASSERT_OK(chunkDocStatus.getStatus()); + + auto chunkDoc = chunkDocStatus.getValue(); + ASSERT_EQ(chunkSplitPoint, chunkDoc.getMax()); + + // Check for increment based on the competing chunk version + ASSERT_EQ(competingVersion.majorVersion(), chunkDoc.getVersion().majorVersion()); + ASSERT_EQ(competingVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion()); + + // Second chunkDoc should have range [chunkSplitPoint, chunkMax] + auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint); + ASSERT_OK(otherChunkDocStatus.getStatus()); + + auto otherChunkDoc = otherChunkDocStatus.getValue(); + ASSERT_EQ(chunkMax, otherChunkDoc.getMax()); + + // Check for increment based on the competing chunk version + ASSERT_EQ(competingVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion()); + ASSERT_EQ(competingVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion()); +} +TEST_F(SplitChunkTest, NonExisingNamespaceErrors) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + auto chunkMin = BSON("a" << 1); + auto chunkMax = BSON("a" << 10); + chunk.setMin(chunkMin); + chunk.setMax(chunkMax); + + std::vector<BSONObj> splitPoints{BSON("a" << 5)}; + + setupChunks({chunk}); + + auto splitStatus = catalogManager()->commitChunkSplit(operationContext(), + NamespaceString("TestDB.NonExistingColl"), + origVersion.epoch(), + ChunkRange(chunkMin, chunkMax), + splitPoints, + "shard0000"); + ASSERT_EQ(ErrorCodes::IllegalOperation, splitStatus); +} + +TEST_F(SplitChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) { + ChunkType chunk; + chunk.setNS("TestDB.TestColl"); + + auto origVersion = ChunkVersion(1, 0, OID::gen()); + chunk.setVersion(origVersion); + chunk.setShard(ShardId("shard0000")); + + auto chunkMin = BSON("a" << 1); + auto chunkMax = BSON("a" << 10); + chunk.setMin(chunkMin); + chunk.setMax(chunkMax); + + std::vector<BSONObj> splitPoints{BSON("a" << 5)}; + + setupChunks({chunk}); + + auto splitStatus = catalogManager()->commitChunkSplit(operationContext(), + NamespaceString("TestDB.TestColl"), + OID::gen(), + ChunkRange(chunkMin, chunkMax), + splitPoints, + "shard0000"); + ASSERT_EQ(ErrorCodes::StaleEpoch, splitStatus); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h index c3fd3d5117e..58c39631fd6 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager.h +++ b/src/mongo/s/catalog/sharding_catalog_manager.h @@ -138,6 +138,17 @@ public: const ChunkRange& range) = 0; /** + * Updates chunk metadata in config.chunks collection to reflect the given chunk being split + * into multiple smaller chunks based on the specified split points. + */ + virtual Status commitChunkSplit(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const ChunkRange& range, + const std::vector<BSONObj>& splitPoints, + const std::string& shardName) = 0; + + /** * Append information about the connection pools owned by the CatalogManager. */ virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0; diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp index 6398f66ab3a..57acb99abae 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp @@ -36,6 +36,7 @@ namespace mongo { using std::string; +using std::vector; ShardingCatalogManagerMock::ShardingCatalogManagerMock() = default; @@ -80,6 +81,15 @@ Status ShardingCatalogManagerMock::removeKeyRangeFromZone(OperationContext* txn, return {ErrorCodes::InternalError, "Method not implemented"}; } +Status ShardingCatalogManagerMock::commitChunkSplit(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const ChunkRange& range, + const std::vector<BSONObj>& splitPoints, + const std::string& shardName) { + return {ErrorCodes::InternalError, "Method not implemented"}; +} + void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {} Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationContext* txn) { diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h index e4c79518c80..7c346bc3e91 100644 --- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h +++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h @@ -69,6 +69,13 @@ public: const NamespaceString& ns, const ChunkRange& range) override; + Status commitChunkSplit(OperationContext* txn, + const NamespaceString& ns, + const OID& requestEpoch, + const ChunkRange& range, + const std::vector<BSONObj>& splitPoints, + const std::string& shardName) override; + void appendConnectionStats(executor::ConnectionPoolStats* stats) override; Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override; diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp index e2eae165287..e769876bdd9 100644 --- a/src/mongo/s/config_server_test_fixture.cpp +++ b/src/mongo/s/config_server_test_fixture.cpp @@ -411,6 +411,29 @@ StatusWith<ShardType> ConfigServerTestFixture::getShardDoc(OperationContext* txn return ShardType::fromBSON(doc.getValue()); } +Status ConfigServerTestFixture::setupChunks(const std::vector<ChunkType>& chunks) { + const NamespaceString chunkNS(ChunkType::ConfigNS); + for (const auto& chunk : chunks) { + auto insertStatus = insertToConfigCollection(operationContext(), chunkNS, chunk.toBSON()); + if (!insertStatus.isOK()) { + return insertStatus; + } + } + + return Status::OK(); +} + +StatusWith<ChunkType> ConfigServerTestFixture::getChunkDoc(OperationContext* txn, + const BSONObj& minKey) { + auto doc = + findOneOnConfigCollection(txn, NamespaceString(ChunkType::ConfigNS), BSON("min" << minKey)); + if (!doc.isOK()) { + return doc.getStatus(); + } + + return ChunkType::fromBSON(doc.getValue()); +} + StatusWith<std::vector<BSONObj>> ConfigServerTestFixture::getIndexes(OperationContext* txn, const NamespaceString& ns) { auto configShard = getConfigShard(); diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h index 776b637a3af..fc929ca5a43 100644 --- a/src/mongo/s/config_server_test_fixture.h +++ b/src/mongo/s/config_server_test_fixture.h @@ -33,6 +33,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/executor/network_test_env.h" +#include "mongo/s/catalog/type_chunk.h" #include "mongo/unittest/unittest.h" #include "mongo/util/net/message_port_mock.h" @@ -147,6 +148,16 @@ public: StatusWith<ShardType> getShardDoc(OperationContext* txn, const std::string& shardId); /** + * Setup the config.chunks collection to contain the given chunks. + */ + Status setupChunks(const std::vector<ChunkType>& chunks); + + /** + * Retrieves the chunk document from the config server. + */ + StatusWith<ChunkType> getChunkDoc(OperationContext* txn, const BSONObj& minKey); + + /** * Returns the indexes definitions defined on a given collection. */ StatusWith<std::vector<BSONObj>> getIndexes(OperationContext* txn, const NamespaceString& ns); diff --git a/src/mongo/s/request_types/split_chunk_request_test.cpp b/src/mongo/s/request_types/split_chunk_request_test.cpp index c22ca9160c4..57ca18f6350 100644 --- a/src/mongo/s/request_types/split_chunk_request_test.cpp +++ b/src/mongo/s/request_types/split_chunk_request_test.cpp @@ -50,11 +50,14 @@ TEST(SplitChunkRequest, BasicValidConfigCommand) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5))))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000"))); ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace()); ASSERT_EQ(OID("7fffffff0000000000000001"), request.getEpoch()); ASSERT(ChunkRange(BSON("a" << 1), BSON("a" << 10)) == request.getChunkRange()); ASSERT_EQ(BSON("a" << 5), request.getSplitPoints().at(0)); + ASSERT_EQ("shard0000", request.getShardName()); } TEST(SplitChunkRequest, ValidWithMultipleSplits) { @@ -68,12 +71,15 @@ TEST(SplitChunkRequest, ValidWithMultipleSplits) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5) << BSON("a" << 7))))); + << BSON_ARRAY(BSON("a" << 5) << BSON("a" << 7)) + << "shard" + << "shard0000"))); ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace()); ASSERT_EQ(OID("7fffffff0000000000000001"), request.getEpoch()); ASSERT(ChunkRange(BSON("a" << 1), BSON("a" << 10)) == request.getChunkRange()); ASSERT_EQ(BSON("a" << 5), request.getSplitPoints().at(0)); ASSERT_EQ(BSON("a" << 7), request.getSplitPoints().at(1)); + ASSERT_EQ("shard0000", request.getShardName()); } TEST(SplitChunkRequest, ConfigCommandtoBSON) { @@ -86,7 +92,9 @@ TEST(SplitChunkRequest, ConfigCommandtoBSON) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000"); BSONObj writeConcernObj = BSON("writeConcern" << BSON("w" << "majority")); @@ -107,7 +115,9 @@ TEST(SplitChunkRequest, MissingNamespaceErrors) { BSON("collEpoch" << OID("7fffffff0000000000000001") << "min" << BSON("a" << 1) << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5)))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); } @@ -119,7 +129,9 @@ TEST(SplitChunkRequest, MissingCollEpochErrors) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5)))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); } @@ -131,7 +143,9 @@ TEST(SplitChunkRequest, MissingChunkToSplitErrors) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5)))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); } @@ -143,7 +157,23 @@ TEST(SplitChunkRequest, MissingSplitPointErrors) { << "min" << BSON("a" << 1) << "max" - << BSON("a" << 10))); + << BSON("a" << 10) + << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); +} + +TEST(SplitChunkRequest, MissingShardNameErrors) { + auto request = SplitChunkRequest::parseFromConfigCommand(BSON("_configsvrSplitChunk" + << "TestDB.TestColl" + << "collEpoch" + << OID("7fffffff0000000000000001") + << "min" + << BSON("a" << 1) + << "max" + << BSON("a" << 10) + << "splitPoints" + << BSON_ARRAY(BSON("a" << 5)))); ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus()); } @@ -154,7 +184,9 @@ TEST(SplitChunkRequest, WrongNamespaceTypeErrors) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5)))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); } @@ -168,7 +200,9 @@ TEST(SplitChunkRequest, WrongCollEpochTypeErrors) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5)))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); } @@ -182,7 +216,9 @@ TEST(SplitChunkRequest, WrongChunkToSplitTypeErrors) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5)))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); } @@ -196,6 +232,24 @@ TEST(SplitChunkRequest, WrongSplitPointTypeErrors) { << "max" << BSON("a" << 10) << "splitPoints" + << 1234 + << "shard" + << "shard0000")); + ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); +} + +TEST(SplitChunkRequest, WrongShardNameTypeErrors) { + auto request = SplitChunkRequest::parseFromConfigCommand(BSON("_configsvrSplitChunk" + << "TestDB.TestColl" + << "collEpoch" + << OID("7fffffff0000000000000001") + << "min" + << BSON("a" << 1) + << "max" + << BSON("a" << 10) + << "splitPoints" + << BSON_ARRAY(BSON("a" << 5)) + << "shard" << 1234)); ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus()); } @@ -210,7 +264,9 @@ TEST(SplitChunkRequest, InvalidNamespaceErrors) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5)))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::InvalidNamespace, request.getStatus()); } @@ -224,7 +280,9 @@ TEST(SplitChunkRequest, EmptyChunkToSplitErrors) { << "max" << BSON("a" << 10) << "splitPoints" - << BSON_ARRAY(BSON("a" << 5)))); + << BSON_ARRAY(BSON("a" << 5)) + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::BadValue, request.getStatus()); } @@ -238,7 +296,9 @@ TEST(SplitChunkRequest, EmptySplitPointsErrors) { << "max" << BSON("a" << 10) << "splitPoints" - << BSONArray())); + << BSONArray() + << "shard" + << "shard0000")); ASSERT_EQ(ErrorCodes::InvalidOptions, request.getStatus()); } } diff --git a/src/mongo/s/request_types/split_chunk_request_type.cpp b/src/mongo/s/request_types/split_chunk_request_type.cpp index 0a52ec255ed..78c2c478d0c 100644 --- a/src/mongo/s/request_types/split_chunk_request_type.cpp +++ b/src/mongo/s/request_types/split_chunk_request_type.cpp @@ -35,6 +35,7 @@ namespace mongo { +using std::string; using std::vector; namespace { @@ -42,20 +43,23 @@ namespace { const char kConfigsvrSplitChunk[] = "_configsvrSplitChunk"; const char kCollEpoch[] = "collEpoch"; const char kSplitPoints[] = "splitPoints"; +const char kShardName[] = "shard"; } // unnamed namespace SplitChunkRequest::SplitChunkRequest(NamespaceString nss, OID epoch, ChunkRange chunkRange, - vector<BSONObj> splitPoints) + vector<BSONObj> splitPoints, + string shardName) : _nss(std::move(nss)), _epoch(std::move(epoch)), _chunkRange(std::move(chunkRange)), - _splitPoints(std::move(splitPoints)) {} + _splitPoints(std::move(splitPoints)), + _shardName(std::move(shardName)) {} StatusWith<SplitChunkRequest> SplitChunkRequest::parseFromConfigCommand(const BSONObj& cmdObj) { - std::string ns; + string ns; auto parseNamespaceStatus = bsonExtractStringField(cmdObj, kConfigsvrSplitChunk, &ns); if (!parseNamespaceStatus.isOK()) { @@ -90,10 +94,18 @@ StatusWith<SplitChunkRequest> SplitChunkRequest::parseFromConfigCommand(const BS } } + string shardName; + auto parseShardNameStatus = bsonExtractStringField(cmdObj, kShardName, &shardName); + + if (!parseShardNameStatus.isOK()) { + return parseShardNameStatus; + } + auto request = SplitChunkRequest(NamespaceString(ns), std::move(epoch), std::move(chunkRangeStatus.getValue()), - std::move(splitPoints)); + std::move(splitPoints), + std::move(shardName)); Status validationStatus = request._validate(); if (!validationStatus.isOK()) { return validationStatus; @@ -122,6 +134,7 @@ void SplitChunkRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) { splitPointsArray.append(splitPoint); } } + cmdBuilder->append(kShardName, _shardName); } const NamespaceString& SplitChunkRequest::getNamespace() const { @@ -140,6 +153,10 @@ const vector<BSONObj>& SplitChunkRequest::getSplitPoints() const { return _splitPoints; } +const string& SplitChunkRequest::getShardName() const { + return _shardName; +} + Status SplitChunkRequest::_validate() { if (!getNamespace().isValid()) { return Status(ErrorCodes::InvalidNamespace, diff --git a/src/mongo/s/request_types/split_chunk_request_type.h b/src/mongo/s/request_types/split_chunk_request_type.h index 79099b170af..35a4b767923 100644 --- a/src/mongo/s/request_types/split_chunk_request_type.h +++ b/src/mongo/s/request_types/split_chunk_request_type.h @@ -47,7 +47,8 @@ public: SplitChunkRequest(NamespaceString nss, OID epoch, ChunkRange chunkRange, - std::vector<BSONObj> splitPoints); + std::vector<BSONObj> splitPoints, + std::string shardName); /** * Parses the provided BSON content as the internal _configsvrSplitChunk command, and if @@ -58,7 +59,8 @@ public: * collEpoch: <OID epoch>, * min: <BSONObj chunkToSplitMin>, * max: <BSONObj chunkToSplitMax>, - * splitPoints: [<BSONObj key>, ...] + * splitPoints: [<BSONObj key>, ...], + * shard: <string shard> * } */ static StatusWith<SplitChunkRequest> parseFromConfigCommand(const BSONObj& cmdObj); @@ -80,6 +82,7 @@ public: const OID& getEpoch() const; const ChunkRange& getChunkRange() const; const std::vector<BSONObj>& getSplitPoints() const; + const std::string& getShardName() const; private: /** @@ -92,6 +95,7 @@ private: OID _epoch; ChunkRange _chunkRange; std::vector<BSONObj> _splitPoints; + std::string _shardName; }; } // namespace mongo |