summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJess Fan <jess.fan@10gen.com>2016-07-22 10:27:20 -0400
committerJess Fan <jess.fan@10gen.com>2016-08-10 16:03:24 -0400
commite38918afa79bbb952e82a5127cb36d85f919703a (patch)
tree8b5c80383f716d0a481d4955c2eb5ae7dbb04720
parent3ffcc4b4a105ccacb6c8d8863d44f88338395b3a (diff)
downloadmongo-e38918afa79bbb952e82a5127cb36d85f919703a.tar.gz
SERVER-24999 Built _configsvrSplitChunk wrapper around applyOps
-rw-r--r--src/mongo/db/catalog/apply_ops.cpp14
-rw-r--r--src/mongo/db/s/config/configsvr_split_chunk_command.cpp123
-rw-r--r--src/mongo/s/catalog/replset/SConscript1
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp210
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h27
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp262
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager.h11
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.cpp10
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.h7
-rw-r--r--src/mongo/s/config_server_test_fixture.cpp23
-rw-r--r--src/mongo/s/config_server_test_fixture.h11
-rw-r--r--src/mongo/s/request_types/split_chunk_request_test.cpp86
-rw-r--r--src/mongo/s/request_types/split_chunk_request_type.cpp25
-rw-r--r--src/mongo/s/request_types/split_chunk_request_type.h8
14 files changed, 770 insertions, 48 deletions
diff --git a/src/mongo/db/catalog/apply_ops.cpp b/src/mongo/db/catalog/apply_ops.cpp
index 267dd013324..28be87cf6de 100644
--- a/src/mongo/db/catalog/apply_ops.cpp
+++ b/src/mongo/db/catalog/apply_ops.cpp
@@ -212,18 +212,20 @@ Status _applyOps(OperationContext* txn,
const BSONObj cmdRewritten = cmdBuilder.done();
+ auto opObserver = getGlobalServiceContext()->getOpObserver();
+ invariant(opObserver);
if (haveWrappingWUOW) {
- getGlobalServiceContext()->getOpObserver()->onApplyOps(txn, tempNS, cmdRewritten);
+ opObserver->onApplyOps(txn, tempNS, cmdRewritten);
} else {
// When executing applyOps outside of a wrapping WriteUnitOfWOrk, always logOp the
- // command regardless of whether the individial ops succeeded and rely on any failures
- // to also on secondaries. This isn't perfect, but it's what the command has always done
- // and is part of its "correct" behavior.
+ // command regardless of whether the individial ops succeeded and rely on any
+ // failures to also on secondaries. This isn't perfect, but it's what the command
+ // has always done and is part of its "correct" behavior.
while (true) {
try {
WriteUnitOfWork wunit(txn);
- getGlobalServiceContext()->getOpObserver()->onApplyOps(
- txn, tempNS, cmdRewritten);
+ opObserver->onApplyOps(txn, tempNS, cmdRewritten);
+
wunit.commit();
break;
} catch (const WriteConflictException& wce) {
diff --git a/src/mongo/db/s/config/configsvr_split_chunk_command.cpp b/src/mongo/db/s/config/configsvr_split_chunk_command.cpp
new file mode 100644
index 00000000000..0799323550d
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_split_chunk_command.cpp
@@ -0,0 +1,123 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/action_type.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/auth/privilege.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/s/request_types/split_chunk_request_type.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace {
+
+using std::string;
+
+/**
+ * Internal sharding command run on config servers to split a chunk.
+ *
+ * Format:
+ * {
+ * _configsvrSplitChunk: <string namespace>,
+ * collEpoch: <OID epoch>,
+ * min: <BSONObj chunkToSplitMin>,
+ * max: <BSONObj chunkToSplitMax>,
+ * splitPoints: [<BSONObj key>, ...],
+ * shard: <string shard>,
+ * writeConcern: <BSONObj>
+ * }
+ */
+class ConfigSvrSplitChunkCommand : public Command {
+public:
+ ConfigSvrSplitChunkCommand() : Command("_configsvrSplitChunk") {}
+
+ void help(std::stringstream& help) const override {
+ help << "Internal command, which is sent by a shard to the sharding config server. Do "
+ "not call directly. Receives, validates, and processes a SplitChunkRequest."
+ }
+
+ bool slaveOk() const override {
+ return false;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return true;
+ }
+
+ Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
+ if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
+ ResourcePattern::forClusterResource(), ActionType::internal)) {
+ return Status(ErrorCodes::Unauthorized, "Unauthorized");
+ }
+ return Status::OK();
+ }
+
+ std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const override {
+ return parseNsFullyQualified(dbname, cmdObj);
+ }
+
+ bool run(OperationContext* txn,
+ const std::string& dbName,
+ BSONObj& cmdObj,
+ int options,
+ std::string& errmsg,
+ BSONObjBuilder& result) override {
+ if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
+ uasserted(ErrorCodes::IllegalOperation,
+ "_configsvrSplitChunk can only be run on config servers");
+ }
+
+ auto parsedRequest = uassertStatusOK(SplitChunkRequest::parseFromConfigCommand(cmdObj));
+
+ Status splitChunkResult =
+ Grid::get(txn)->catalogManager()->commitChunkSplit(txn,
+ parsedRequest.getNamespace(),
+ parsedRequest.getEpoch(),
+ parsedRequest.getChunkRange(),
+ parsedRequest.getSplitPoints(),
+ parsedRequest.getShardName());
+ uassertStatusOK(splitChunkResult);
+
+ return true;
+ }
+} configsvrSplitChunkCmd;
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/SConscript b/src/mongo/s/catalog/replset/SConscript
index 6c8cc788e8d..9296788c94d 100644
--- a/src/mongo/s/catalog/replset/SConscript
+++ b/src/mongo/s/catalog/replset/SConscript
@@ -112,6 +112,7 @@ env.CppUnitTest(
'sharding_catalog_assign_key_range_to_zone_test.cpp',
'sharding_catalog_remove_shard_from_zone_test.cpp',
'sharding_catalog_config_initialization_test.cpp',
+ 'sharding_catalog_split_chunk_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/repl/replmocks',
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index ee81e49eb6f..dc288b6cde8 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -43,10 +43,12 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/client.h"
-#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
@@ -54,7 +56,6 @@
#include "mongo/s/balancer/balancer_policy.h"
#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
-#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_database.h"
@@ -103,12 +104,16 @@ const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::
Lock::ResourceMutex kShardMembershipLock;
/**
- * Lock for shard zoning operations. This should be acquired when doing any operations that
- * can affect the config.tags collection or the tags field of the config.shards collection.
- * No other locks should be held when locking this. If an operation needs to take database locks
- * (for example to write to a local collection) those locks should be taken after taking this.
- */
-Lock::ResourceMutex kZoneOpLock;
+ * Append min, max and version information from chunk to the buffer for logChange purposes.
+*/
+void appendShortVersion(BufBuilder* b, const ChunkType& chunk) {
+ BSONObjBuilder bb(*b);
+ bb.append(ChunkType::min(), chunk.getMin());
+ bb.append(ChunkType::max(), chunk.getMax());
+ if (chunk.isVersionSet())
+ chunk.getVersion().addToBSON(bb, ChunkType::DEPRECATED_lastmod());
+ bb.done();
+}
/**
* Checks if the given key range for the given namespace conflicts with an existing key range.
@@ -636,7 +641,7 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
}
// Only one addShard operation can be in progress at a time.
- Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock);
+ Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock);
// TODO: Don't create a detached Shard object, create a detached RemoteCommandTargeter instead.
const std::shared_ptr<Shard> shard{
@@ -777,7 +782,7 @@ StatusWith<string> ShardingCatalogManagerImpl::addShard(
Status ShardingCatalogManagerImpl::addShardToZone(OperationContext* txn,
const std::string& shardName,
const std::string& zoneName) {
- Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock);
+ Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock);
auto updateStatus = _catalogClient->updateConfigDocument(
txn,
@@ -802,7 +807,7 @@ Status ShardingCatalogManagerImpl::addShardToZone(OperationContext* txn,
Status ShardingCatalogManagerImpl::removeShardFromZone(OperationContext* txn,
const std::string& shardName,
const std::string& zoneName) {
- Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock);
+ Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock);
auto configShard = Grid::get(txn)->shardRegistry()->getConfigShard();
const NamespaceString shardNS(ShardType::ConfigNS);
@@ -915,7 +920,7 @@ Status ShardingCatalogManagerImpl::assignKeyRangeToZone(OperationContext* txn,
const NamespaceString& ns,
const ChunkRange& givenRange,
const string& zoneName) {
- Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock);
+ Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock);
auto configServer = Grid::get(txn)->shardRegistry()->getConfigShard();
@@ -977,7 +982,7 @@ Status ShardingCatalogManagerImpl::assignKeyRangeToZone(OperationContext* txn,
Status ShardingCatalogManagerImpl::removeKeyRangeFromZone(OperationContext* txn,
const NamespaceString& ns,
const ChunkRange& range) {
- Lock::ExclusiveLock lk(txn->lockState(), kZoneOpLock);
+ Lock::ExclusiveLock lk(txn->lockState(), _kZoneOpLock);
auto configServer = Grid::get(txn)->shardRegistry()->getConfigShard();
@@ -996,6 +1001,163 @@ Status ShardingCatalogManagerImpl::removeKeyRangeFromZone(OperationContext* txn,
txn, TagsType::ConfigNS, removeBuilder.obj(), kNoWaitWriteConcern);
}
+Status ShardingCatalogManagerImpl::commitChunkSplit(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const ChunkRange& range,
+ const std::vector<BSONObj>& splitPoints,
+ const std::string& shardName) {
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations
+ // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/
+ // move chunks on different collections to proceed in parallel
+ Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock);
+
+ // Acquire GlobalLock in MODE_X twice to prevent yielding.
+ // GlobalLock and the following lock on config.chunks are only needed to support
+ // mixed-mode operation with mongoses from 3.2
+ // TODO(SERVER-25337): Remove GlobalLock and config.chunks lock after 3.4
+ Lock::GlobalLock firstGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
+ Lock::GlobalLock secondGlobalLock(txn->lockState(), MODE_X, UINT_MAX);
+
+ // Acquire lock on config.chunks in MODE_X
+ AutoGetCollection autoColl(txn, NamespaceString(ChunkType::ConfigNS), MODE_X);
+
+ // Get the chunk with highest version for this namespace
+ auto findStatus = grid.shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON("ns" << ns.ns()),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ 1);
+
+ if (!findStatus.isOK()) {
+ return findStatus.getStatus();
+ }
+
+ const auto& chunksVector = findStatus.getValue().docs;
+ if (chunksVector.empty())
+ return {ErrorCodes::IllegalOperation,
+ "collection does not exist, isn't sharded, or has no chunks"};
+
+ ChunkVersion collVersion =
+ ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod());
+
+ // Return an error if epoch of chunk does not match epoch of request
+ if (collVersion.epoch() != requestEpoch) {
+ return {ErrorCodes::StaleEpoch,
+ "epoch of chunk does not match epoch of request. This most likely means "
+ "that the collection was dropped and re-created."};
+ }
+
+ std::vector<ChunkType> newChunks;
+
+ auto newChunkBounds(splitPoints);
+ ChunkVersion currentMaxVersion = collVersion;
+ auto startKey = range.getMin();
+ newChunkBounds.push_back(
+ range.getMax()); // makes it easier to have 'max' in the next loop. remove later.
+
+ BSONArrayBuilder updates;
+
+ for (const auto& endKey : newChunkBounds) {
+ // splits only update the 'minor' portion of version
+ currentMaxVersion.incMinor();
+
+ // build an update operation against the chunks collection of the config database
+ // with
+ // upsert true
+ BSONObjBuilder op;
+ op.append("op", "u");
+ op.appendBool("b", true);
+ op.append("ns", ChunkType::ConfigNS);
+
+ // add the modified (new) chunk information as the update object
+ BSONObjBuilder n(op.subobjStart("o"));
+ n.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey));
+ currentMaxVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
+ n.append(ChunkType::ns(), ns.ns());
+ n.append(ChunkType::min(), startKey);
+ n.append(ChunkType::max(), endKey);
+ n.append(ChunkType::shard(), shardName);
+ n.done();
+
+ // add the chunk's _id as the query part of the update statement
+ BSONObjBuilder q(op.subobjStart("o2"));
+ q.append(ChunkType::name(), ChunkType::genID(ns.ns(), startKey));
+ q.done();
+
+ updates.append(op.obj());
+
+ // remember this chunk info for logging later
+ ChunkType chunk;
+ chunk.setMin(startKey);
+ chunk.setMax(endKey);
+ chunk.setVersion(currentMaxVersion);
+
+ newChunks.push_back(std::move(chunk));
+
+ startKey = endKey;
+ }
+
+ newChunkBounds.pop_back(); // 'max' was used as sentinel
+
+ BSONArrayBuilder preCond;
+ {
+ BSONObjBuilder b;
+ b.append("ns", ChunkType::ConfigNS);
+ b.append("q",
+ BSON("query" << BSON(ChunkType::ns(ns.ns())) << "orderby"
+ << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
+ {
+ BSONObjBuilder bb(b.subobjStart("res"));
+ collVersion.addToBSON(bb, ChunkType::DEPRECATED_lastmod());
+ }
+ preCond.append(b.obj());
+ }
+
+ // apply the batch of updates to remote and local metadata
+ Status applyOpsStatus = grid.catalogClient(txn)->applyChunkOpsDeprecated(
+ txn, updates.arr(), preCond.arr(), ns.ns(), currentMaxVersion);
+ if (!applyOpsStatus.isOK()) {
+ return applyOpsStatus;
+ }
+
+ // log changes
+ BSONObjBuilder logDetail;
+ {
+ BSONObjBuilder b(logDetail.subobjStart("before"));
+ b.append(ChunkType::min(), range.getMin());
+ b.append(ChunkType::max(), range.getMax());
+ collVersion.addToBSON(b, ChunkType::DEPRECATED_lastmod());
+ }
+
+ if (newChunks.size() == 2) {
+ appendShortVersion(&logDetail.subobjStart("left"), newChunks[0]);
+ appendShortVersion(&logDetail.subobjStart("right"), newChunks[1]);
+
+ grid.catalogClient(txn)->logChange(txn, "split", ns.ns(), logDetail.obj());
+ } else {
+ BSONObj beforeDetailObj = logDetail.obj();
+ BSONObj firstDetailObj = beforeDetailObj.getOwned();
+ const int newChunksSize = newChunks.size();
+
+ for (int i = 0; i < newChunksSize; i++) {
+ BSONObjBuilder chunkDetail;
+ chunkDetail.appendElements(beforeDetailObj);
+ chunkDetail.append("number", i + 1);
+ chunkDetail.append("of", newChunksSize);
+ appendShortVersion(&chunkDetail.subobjStart("chunk"), newChunks[i]);
+
+ grid.catalogClient(txn)->logChange(txn, "multi-split", ns.ns(), chunkDetail.obj());
+ }
+ }
+
+ return applyOpsStatus;
+}
+
void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
_executorForAddShard->appendConnectionStats(stats);
}
@@ -1252,7 +1414,8 @@ void ShardingCatalogManagerImpl::cancelAddShardTaskIfNeeded(const ShardId& shard
auto cbHandle = _getAddShardHandle_inlock(shardId);
_executorForAddShard->cancel(cbHandle);
// Untrack the handle here so that if this shard is re-added before the CallbackCanceled
- // status is delivered to the callback, a new addShard task for the shard will be created.
+ // status is delivered to the callback, a new addShard task for the shard will be
+ // created.
_untrackAddShardHandle_inlock(shardId);
}
}
@@ -1290,8 +1453,8 @@ void ShardingCatalogManagerImpl::_scheduleAddShardTask(
auto swHost = targeter->findHost(ReadPreferenceSetting{ReadPreference::PrimaryOnly},
Milliseconds(kDefaultFindHostMaxWaitTime));
if (!swHost.isOK()) {
- // A 3.2 mongos must have previously successfully communicated with hosts in this shard, so
- // a failure to find a host here is probably transient, and it is safe to retry.
+ // A 3.2 mongos must have previously successfully communicated with hosts in this shard,
+ // so a failure to find a host here is probably transient, and it is safe to retry.
warning() << "Failed to find host for shard " << shardType
<< " when trying to upsert a shardIdentity document, "
<< causedBy(swHost.getStatus());
@@ -1408,7 +1571,8 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse(
// If the command succeeded, update config.shards to mark the shard as shardAware.
- // Release the _addShardHandlesMutex before updating config.shards, since it involves disk I/O.
+ // Release the _addShardHandlesMutex before updating config.shards, since it involves disk
+ // I/O.
// At worst, a redundant addShard task will be scheduled by a new primary if the current
// primary fails during that write.
lk.unlock();
@@ -1425,12 +1589,12 @@ void ShardingCatalogManagerImpl::_handleAddShardTaskResponse(
// scope at the end of this code block.
auto txnPtr = cc().makeOperationContext();
- // Use kNoWaitWriteConcern to prevent waiting in this callback, since we don't handle a failed
- // response anyway. If the write is rolled back, the new config primary will attempt to
- // initialize sharding awareness on this shard again, and this update to config.shards will be
- // automatically retried then. If it fails because the shard was removed through the normal
- // removeShard path (so the entry in config.shards was deleted), no new addShard task will get
- // scheduled on the next transition to primary.
+ // Use kNoWaitWriteConcern to prevent waiting in this callback, since we don't handle a
+ // failed response anyway. If the write is rolled back, the new config primary will attempt to
+ // initialize sharding awareness on this shard again, and this update to config.shards will
+ // be automatically retried then. If it fails because the shard was removed through the normal
+ // removeShard path (so the entry in config.shards was deleted), no new addShard task will
+ // get scheduled on the next transition to primary.
auto updateStatus = _catalogClient->updateConfigDocument(
txnPtr.get(),
ShardType::ConfigNS,
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
index 9b261906ecf..339e08522b0 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.h
@@ -30,8 +30,10 @@
#include <vector>
+#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/stdx/mutex.h"
@@ -85,6 +87,13 @@ public:
const NamespaceString& ns,
const ChunkRange& range) override;
+ Status commitChunkSplit(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const ChunkRange& range,
+ const std::vector<BSONObj>& splitPoints,
+ const std::string& shardName) override;
+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
@@ -281,6 +290,24 @@ private:
// Protects the _addShardHandles map.
stdx::mutex _addShardHandlesMutex;
+
+ /**
+ * Lock for shard zoning operations. This should be acquired when doing any operations that
+ * can affect the config.tags collection or the tags field of the config.shards collection.
+ * No other locks should be held when locking this. If an operation needs to take database
+ * locks (for example to write to a local collection) those locks should be taken after
+ * taking this.
+ */
+ Lock::ResourceMutex _kZoneOpLock;
+
+ /**
+ * Lock for chunk split/merge/move operations. This should be acquired when doing split/merge/
+ * move operations that can affect the config.chunks collection.
+ * No other locks should be held when locking this. If an operation needs to take database
+ * locks (for example to write to a local collection) those locks should be taken after
+ * taking this.
+ */
+ Lock::ResourceMutex _kChunkOpLock;
};
} // namespace mongo
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp b/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp
new file mode 100644
index 00000000000..3d78a9bfd16
--- /dev/null
+++ b/src/mongo/s/catalog/replset/sharding_catalog_split_chunk_test.cpp
@@ -0,0 +1,262 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/read_preference.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/config_server_test_fixture.h"
+
+namespace mongo {
+namespace {
+
+using SplitChunkTest = ConfigServerTestFixture;
+
+TEST_F(SplitChunkTest, SplitExistingChunkCorrectlyShouldSucceed) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkMax = BSON("a" << 10);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+
+ auto chunkSplitPoint = BSON("a" << 5);
+ std::vector<BSONObj> splitPoints{chunkSplitPoint};
+
+ setupChunks({chunk});
+
+ ASSERT_OK(catalogManager()->commitChunkSplit(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ origVersion.epoch(),
+ ChunkRange(chunkMin, chunkMax),
+ splitPoints,
+ "shard0000"));
+
+ // First chunkDoc should have range [chunkMin, chunkSplitPoint]
+ auto chunkDocStatus = getChunkDoc(operationContext(), chunkMin);
+ ASSERT_OK(chunkDocStatus.getStatus());
+
+ auto chunkDoc = chunkDocStatus.getValue();
+ ASSERT_EQ(chunkSplitPoint, chunkDoc.getMax());
+
+ // Check for increment on first chunkDoc's minor version
+ ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+
+ // Second chunkDoc should have range [chunkSplitPoint, chunkMax]
+ auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
+ ASSERT_OK(otherChunkDocStatus.getStatus());
+
+ auto otherChunkDoc = otherChunkDocStatus.getValue();
+ ASSERT_EQ(chunkMax, otherChunkDoc.getMax());
+
+ // Check for increment on second chunkDoc's minor version
+ ASSERT_EQ(origVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion());
+}
+
+TEST_F(SplitChunkTest, MultipleSplitsOnExistingChunkShouldSucceed) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkMax = BSON("a" << 10);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+
+ auto chunkSplitPoint = BSON("a" << 5);
+ auto chunkSplitPoint2 = BSON("a" << 7);
+ std::vector<BSONObj> splitPoints{chunkSplitPoint, chunkSplitPoint2};
+
+ setupChunks({chunk});
+
+ ASSERT_OK(catalogManager()->commitChunkSplit(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ origVersion.epoch(),
+ ChunkRange(chunkMin, chunkMax),
+ splitPoints,
+ "shard0000"));
+
+ // First chunkDoc should have range [chunkMin, chunkSplitPoint]
+ auto chunkDocStatus = getChunkDoc(operationContext(), chunkMin);
+ ASSERT_OK(chunkDocStatus.getStatus());
+
+ auto chunkDoc = chunkDocStatus.getValue();
+ ASSERT_EQ(chunkSplitPoint, chunkDoc.getMax());
+
+ // Check for increment on first chunkDoc's minor version
+ ASSERT_EQ(origVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+
+ // Second chunkDoc should have range [chunkSplitPoint, chunkSplitPoint2]
+ auto midChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
+ ASSERT_OK(midChunkDocStatus.getStatus());
+
+ auto midChunkDoc = midChunkDocStatus.getValue();
+ ASSERT_EQ(chunkSplitPoint2, midChunkDoc.getMax());
+
+ // Check for increment on second chunkDoc's minor version
+ ASSERT_EQ(origVersion.majorVersion(), midChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 2, midChunkDoc.getVersion().minorVersion());
+
+ // Third chunkDoc should have range [chunkSplitPoint2, chunkMax]
+ auto lastChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint2);
+ ASSERT_OK(lastChunkDocStatus.getStatus());
+
+ auto lastChunkDoc = lastChunkDocStatus.getValue();
+ ASSERT_EQ(chunkMax, lastChunkDoc.getMax());
+
+ {
+ // Check for increment on third chunkDoc's minor version
+ ASSERT_EQ(origVersion.majorVersion(), lastChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(origVersion.minorVersion() + 3, lastChunkDoc.getVersion().minorVersion());
+ }
+}
+
+TEST_F(SplitChunkTest, NewSplitShouldClaimHighestVersion) {
+ ChunkType chunk, chunk2;
+ chunk.setNS("TestDB.TestColl");
+ chunk2.setNS("TestDB.TestColl");
+ auto collEpoch = OID::gen();
+
+ // set up first chunk
+ auto origVersion = ChunkVersion(1, 2, collEpoch);
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkMax = BSON("a" << 10);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+
+ auto chunkSplitPoint = BSON("a" << 5);
+ std::vector<BSONObj> splitPoints{chunkSplitPoint};
+
+ // set up second chunk (chunk2)
+ auto competingVersion = ChunkVersion(2, 1, collEpoch);
+ chunk2.setVersion(competingVersion);
+ chunk2.setShard(ShardId("shard0000"));
+ chunk2.setMin(BSON("a" << 10));
+ chunk2.setMax(BSON("a" << 20));
+
+ setupChunks({chunk, chunk2});
+
+ ASSERT_OK(catalogManager()->commitChunkSplit(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ collEpoch,
+ ChunkRange(chunkMin, chunkMax),
+ splitPoints,
+ "shard0000"));
+
+ // First chunkDoc should have range [chunkMin, chunkSplitPoint]
+ auto chunkDocStatus = getChunkDoc(operationContext(), chunkMin);
+ ASSERT_OK(chunkDocStatus.getStatus());
+
+ auto chunkDoc = chunkDocStatus.getValue();
+ ASSERT_EQ(chunkSplitPoint, chunkDoc.getMax());
+
+ // Check for increment based on the competing chunk version
+ ASSERT_EQ(competingVersion.majorVersion(), chunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(competingVersion.minorVersion() + 1, chunkDoc.getVersion().minorVersion());
+
+ // Second chunkDoc should have range [chunkSplitPoint, chunkMax]
+ auto otherChunkDocStatus = getChunkDoc(operationContext(), chunkSplitPoint);
+ ASSERT_OK(otherChunkDocStatus.getStatus());
+
+ auto otherChunkDoc = otherChunkDocStatus.getValue();
+ ASSERT_EQ(chunkMax, otherChunkDoc.getMax());
+
+ // Check for increment based on the competing chunk version
+ ASSERT_EQ(competingVersion.majorVersion(), otherChunkDoc.getVersion().majorVersion());
+ ASSERT_EQ(competingVersion.minorVersion() + 2, otherChunkDoc.getVersion().minorVersion());
+}
+TEST_F(SplitChunkTest, NonExisingNamespaceErrors) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkMax = BSON("a" << 10);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+
+ std::vector<BSONObj> splitPoints{BSON("a" << 5)};
+
+ setupChunks({chunk});
+
+ auto splitStatus = catalogManager()->commitChunkSplit(operationContext(),
+ NamespaceString("TestDB.NonExistingColl"),
+ origVersion.epoch(),
+ ChunkRange(chunkMin, chunkMax),
+ splitPoints,
+ "shard0000");
+ ASSERT_EQ(ErrorCodes::IllegalOperation, splitStatus);
+}
+
+TEST_F(SplitChunkTest, NonMatchingEpochsOfChunkAndRequestErrors) {
+ ChunkType chunk;
+ chunk.setNS("TestDB.TestColl");
+
+ auto origVersion = ChunkVersion(1, 0, OID::gen());
+ chunk.setVersion(origVersion);
+ chunk.setShard(ShardId("shard0000"));
+
+ auto chunkMin = BSON("a" << 1);
+ auto chunkMax = BSON("a" << 10);
+ chunk.setMin(chunkMin);
+ chunk.setMax(chunkMax);
+
+ std::vector<BSONObj> splitPoints{BSON("a" << 5)};
+
+ setupChunks({chunk});
+
+ auto splitStatus = catalogManager()->commitChunkSplit(operationContext(),
+ NamespaceString("TestDB.TestColl"),
+ OID::gen(),
+ ChunkRange(chunkMin, chunkMax),
+ splitPoints,
+ "shard0000");
+ ASSERT_EQ(ErrorCodes::StaleEpoch, splitStatus);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h
index c3fd3d5117e..58c39631fd6 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager.h
@@ -138,6 +138,17 @@ public:
const ChunkRange& range) = 0;
/**
+ * Updates chunk metadata in config.chunks collection to reflect the given chunk being split
+ * into multiple smaller chunks based on the specified split points.
+ */
+ virtual Status commitChunkSplit(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const ChunkRange& range,
+ const std::vector<BSONObj>& splitPoints,
+ const std::string& shardName) = 0;
+
+ /**
* Append information about the connection pools owned by the CatalogManager.
*/
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0;
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
index 6398f66ab3a..57acb99abae 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
@@ -36,6 +36,7 @@
namespace mongo {
using std::string;
+using std::vector;
ShardingCatalogManagerMock::ShardingCatalogManagerMock() = default;
@@ -80,6 +81,15 @@ Status ShardingCatalogManagerMock::removeKeyRangeFromZone(OperationContext* txn,
return {ErrorCodes::InternalError, "Method not implemented"};
}
+Status ShardingCatalogManagerMock::commitChunkSplit(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const ChunkRange& range,
+ const std::vector<BSONObj>& splitPoints,
+ const std::string& shardName) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {}
Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationContext* txn) {
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
index e4c79518c80..7c346bc3e91 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
@@ -69,6 +69,13 @@ public:
const NamespaceString& ns,
const ChunkRange& range) override;
+ Status commitChunkSplit(OperationContext* txn,
+ const NamespaceString& ns,
+ const OID& requestEpoch,
+ const ChunkRange& range,
+ const std::vector<BSONObj>& splitPoints,
+ const std::string& shardName) override;
+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
diff --git a/src/mongo/s/config_server_test_fixture.cpp b/src/mongo/s/config_server_test_fixture.cpp
index e2eae165287..e769876bdd9 100644
--- a/src/mongo/s/config_server_test_fixture.cpp
+++ b/src/mongo/s/config_server_test_fixture.cpp
@@ -411,6 +411,29 @@ StatusWith<ShardType> ConfigServerTestFixture::getShardDoc(OperationContext* txn
return ShardType::fromBSON(doc.getValue());
}
+Status ConfigServerTestFixture::setupChunks(const std::vector<ChunkType>& chunks) {
+ const NamespaceString chunkNS(ChunkType::ConfigNS);
+ for (const auto& chunk : chunks) {
+ auto insertStatus = insertToConfigCollection(operationContext(), chunkNS, chunk.toBSON());
+ if (!insertStatus.isOK()) {
+ return insertStatus;
+ }
+ }
+
+ return Status::OK();
+}
+
+StatusWith<ChunkType> ConfigServerTestFixture::getChunkDoc(OperationContext* txn,
+ const BSONObj& minKey) {
+ auto doc =
+ findOneOnConfigCollection(txn, NamespaceString(ChunkType::ConfigNS), BSON("min" << minKey));
+ if (!doc.isOK()) {
+ return doc.getStatus();
+ }
+
+ return ChunkType::fromBSON(doc.getValue());
+}
+
StatusWith<std::vector<BSONObj>> ConfigServerTestFixture::getIndexes(OperationContext* txn,
const NamespaceString& ns) {
auto configShard = getConfigShard();
diff --git a/src/mongo/s/config_server_test_fixture.h b/src/mongo/s/config_server_test_fixture.h
index 776b637a3af..fc929ca5a43 100644
--- a/src/mongo/s/config_server_test_fixture.h
+++ b/src/mongo/s/config_server_test_fixture.h
@@ -33,6 +33,7 @@
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/executor/network_test_env.h"
+#include "mongo/s/catalog/type_chunk.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/net/message_port_mock.h"
@@ -147,6 +148,16 @@ public:
StatusWith<ShardType> getShardDoc(OperationContext* txn, const std::string& shardId);
/**
+ * Setup the config.chunks collection to contain the given chunks.
+ */
+ Status setupChunks(const std::vector<ChunkType>& chunks);
+
+ /**
+ * Retrieves the chunk document from the config server.
+ */
+ StatusWith<ChunkType> getChunkDoc(OperationContext* txn, const BSONObj& minKey);
+
+ /**
* Returns the indexes definitions defined on a given collection.
*/
StatusWith<std::vector<BSONObj>> getIndexes(OperationContext* txn, const NamespaceString& ns);
diff --git a/src/mongo/s/request_types/split_chunk_request_test.cpp b/src/mongo/s/request_types/split_chunk_request_test.cpp
index c22ca9160c4..57ca18f6350 100644
--- a/src/mongo/s/request_types/split_chunk_request_test.cpp
+++ b/src/mongo/s/request_types/split_chunk_request_test.cpp
@@ -50,11 +50,14 @@ TEST(SplitChunkRequest, BasicValidConfigCommand) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5)))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000")));
ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace());
ASSERT_EQ(OID("7fffffff0000000000000001"), request.getEpoch());
ASSERT(ChunkRange(BSON("a" << 1), BSON("a" << 10)) == request.getChunkRange());
ASSERT_EQ(BSON("a" << 5), request.getSplitPoints().at(0));
+ ASSERT_EQ("shard0000", request.getShardName());
}
TEST(SplitChunkRequest, ValidWithMultipleSplits) {
@@ -68,12 +71,15 @@ TEST(SplitChunkRequest, ValidWithMultipleSplits) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5) << BSON("a" << 7)))));
+ << BSON_ARRAY(BSON("a" << 5) << BSON("a" << 7))
+ << "shard"
+ << "shard0000")));
ASSERT_EQ(NamespaceString("TestDB", "TestColl"), request.getNamespace());
ASSERT_EQ(OID("7fffffff0000000000000001"), request.getEpoch());
ASSERT(ChunkRange(BSON("a" << 1), BSON("a" << 10)) == request.getChunkRange());
ASSERT_EQ(BSON("a" << 5), request.getSplitPoints().at(0));
ASSERT_EQ(BSON("a" << 7), request.getSplitPoints().at(1));
+ ASSERT_EQ("shard0000", request.getShardName());
}
TEST(SplitChunkRequest, ConfigCommandtoBSON) {
@@ -86,7 +92,9 @@ TEST(SplitChunkRequest, ConfigCommandtoBSON) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5)));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000");
BSONObj writeConcernObj = BSON("writeConcern" << BSON("w"
<< "majority"));
@@ -107,7 +115,9 @@ TEST(SplitChunkRequest, MissingNamespaceErrors) {
BSON("collEpoch" << OID("7fffffff0000000000000001") << "min" << BSON("a" << 1) << "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
}
@@ -119,7 +129,9 @@ TEST(SplitChunkRequest, MissingCollEpochErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
}
@@ -131,7 +143,9 @@ TEST(SplitChunkRequest, MissingChunkToSplitErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
}
@@ -143,7 +157,23 @@ TEST(SplitChunkRequest, MissingSplitPointErrors) {
<< "min"
<< BSON("a" << 1)
<< "max"
- << BSON("a" << 10)));
+ << BSON("a" << 10)
+ << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
+}
+
+TEST(SplitChunkRequest, MissingShardNameErrors) {
+ auto request = SplitChunkRequest::parseFromConfigCommand(BSON("_configsvrSplitChunk"
+ << "TestDB.TestColl"
+ << "collEpoch"
+ << OID("7fffffff0000000000000001")
+ << "min"
+ << BSON("a" << 1)
+ << "max"
+ << BSON("a" << 10)
+ << "splitPoints"
+ << BSON_ARRAY(BSON("a" << 5))));
ASSERT_EQ(ErrorCodes::NoSuchKey, request.getStatus());
}
@@ -154,7 +184,9 @@ TEST(SplitChunkRequest, WrongNamespaceTypeErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
}
@@ -168,7 +200,9 @@ TEST(SplitChunkRequest, WrongCollEpochTypeErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
}
@@ -182,7 +216,9 @@ TEST(SplitChunkRequest, WrongChunkToSplitTypeErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
}
@@ -196,6 +232,24 @@ TEST(SplitChunkRequest, WrongSplitPointTypeErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
+ << 1234
+ << "shard"
+ << "shard0000"));
+ ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
+}
+
+TEST(SplitChunkRequest, WrongShardNameTypeErrors) {
+ auto request = SplitChunkRequest::parseFromConfigCommand(BSON("_configsvrSplitChunk"
+ << "TestDB.TestColl"
+ << "collEpoch"
+ << OID("7fffffff0000000000000001")
+ << "min"
+ << BSON("a" << 1)
+ << "max"
+ << BSON("a" << 10)
+ << "splitPoints"
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
<< 1234));
ASSERT_EQ(ErrorCodes::TypeMismatch, request.getStatus());
}
@@ -210,7 +264,9 @@ TEST(SplitChunkRequest, InvalidNamespaceErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::InvalidNamespace, request.getStatus());
}
@@ -224,7 +280,9 @@ TEST(SplitChunkRequest, EmptyChunkToSplitErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSON_ARRAY(BSON("a" << 5))));
+ << BSON_ARRAY(BSON("a" << 5))
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::BadValue, request.getStatus());
}
@@ -238,7 +296,9 @@ TEST(SplitChunkRequest, EmptySplitPointsErrors) {
<< "max"
<< BSON("a" << 10)
<< "splitPoints"
- << BSONArray()));
+ << BSONArray()
+ << "shard"
+ << "shard0000"));
ASSERT_EQ(ErrorCodes::InvalidOptions, request.getStatus());
}
}
diff --git a/src/mongo/s/request_types/split_chunk_request_type.cpp b/src/mongo/s/request_types/split_chunk_request_type.cpp
index 0a52ec255ed..78c2c478d0c 100644
--- a/src/mongo/s/request_types/split_chunk_request_type.cpp
+++ b/src/mongo/s/request_types/split_chunk_request_type.cpp
@@ -35,6 +35,7 @@
namespace mongo {
+using std::string;
using std::vector;
namespace {
@@ -42,20 +43,23 @@ namespace {
const char kConfigsvrSplitChunk[] = "_configsvrSplitChunk";
const char kCollEpoch[] = "collEpoch";
const char kSplitPoints[] = "splitPoints";
+const char kShardName[] = "shard";
} // unnamed namespace
SplitChunkRequest::SplitChunkRequest(NamespaceString nss,
OID epoch,
ChunkRange chunkRange,
- vector<BSONObj> splitPoints)
+ vector<BSONObj> splitPoints,
+ string shardName)
: _nss(std::move(nss)),
_epoch(std::move(epoch)),
_chunkRange(std::move(chunkRange)),
- _splitPoints(std::move(splitPoints)) {}
+ _splitPoints(std::move(splitPoints)),
+ _shardName(std::move(shardName)) {}
StatusWith<SplitChunkRequest> SplitChunkRequest::parseFromConfigCommand(const BSONObj& cmdObj) {
- std::string ns;
+ string ns;
auto parseNamespaceStatus = bsonExtractStringField(cmdObj, kConfigsvrSplitChunk, &ns);
if (!parseNamespaceStatus.isOK()) {
@@ -90,10 +94,18 @@ StatusWith<SplitChunkRequest> SplitChunkRequest::parseFromConfigCommand(const BS
}
}
+ string shardName;
+ auto parseShardNameStatus = bsonExtractStringField(cmdObj, kShardName, &shardName);
+
+ if (!parseShardNameStatus.isOK()) {
+ return parseShardNameStatus;
+ }
+
auto request = SplitChunkRequest(NamespaceString(ns),
std::move(epoch),
std::move(chunkRangeStatus.getValue()),
- std::move(splitPoints));
+ std::move(splitPoints),
+ std::move(shardName));
Status validationStatus = request._validate();
if (!validationStatus.isOK()) {
return validationStatus;
@@ -122,6 +134,7 @@ void SplitChunkRequest::appendAsConfigCommand(BSONObjBuilder* cmdBuilder) {
splitPointsArray.append(splitPoint);
}
}
+ cmdBuilder->append(kShardName, _shardName);
}
const NamespaceString& SplitChunkRequest::getNamespace() const {
@@ -140,6 +153,10 @@ const vector<BSONObj>& SplitChunkRequest::getSplitPoints() const {
return _splitPoints;
}
+const string& SplitChunkRequest::getShardName() const {
+ return _shardName;
+}
+
Status SplitChunkRequest::_validate() {
if (!getNamespace().isValid()) {
return Status(ErrorCodes::InvalidNamespace,
diff --git a/src/mongo/s/request_types/split_chunk_request_type.h b/src/mongo/s/request_types/split_chunk_request_type.h
index 79099b170af..35a4b767923 100644
--- a/src/mongo/s/request_types/split_chunk_request_type.h
+++ b/src/mongo/s/request_types/split_chunk_request_type.h
@@ -47,7 +47,8 @@ public:
SplitChunkRequest(NamespaceString nss,
OID epoch,
ChunkRange chunkRange,
- std::vector<BSONObj> splitPoints);
+ std::vector<BSONObj> splitPoints,
+ std::string shardName);
/**
* Parses the provided BSON content as the internal _configsvrSplitChunk command, and if
@@ -58,7 +59,8 @@ public:
* collEpoch: <OID epoch>,
* min: <BSONObj chunkToSplitMin>,
* max: <BSONObj chunkToSplitMax>,
- * splitPoints: [<BSONObj key>, ...]
+ * splitPoints: [<BSONObj key>, ...],
+ * shard: <string shard>
* }
*/
static StatusWith<SplitChunkRequest> parseFromConfigCommand(const BSONObj& cmdObj);
@@ -80,6 +82,7 @@ public:
const OID& getEpoch() const;
const ChunkRange& getChunkRange() const;
const std::vector<BSONObj>& getSplitPoints() const;
+ const std::string& getShardName() const;
private:
/**
@@ -92,6 +95,7 @@ private:
OID _epoch;
ChunkRange _chunkRange;
std::vector<BSONObj> _splitPoints;
+ std::string _shardName;
};
} // namespace mongo