summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Myers <nathan.myers@10gen.com>2016-12-05 10:45:16 -0500
committerNathan Myers <nathan.myers@10gen.com>2016-12-05 10:45:16 -0500
commit9b403d89bb81064a9d4813724d075c51b0121e97 (patch)
treeb4b689c5ad238a24213194c314d973def3583fdc
parent2d235202ad823bb0d1b0d99cf55c1589e57d67b9 (diff)
downloadmongo-9b403d89bb81064a9d4813724d075c51b0121e97.tar.gz
SERVER-26562 Move CommitChunkMigration cmd impl to catalog, prep for unit tests
-rw-r--r--src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp297
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager.h21
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_impl.cpp276
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_impl.h11
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.cpp11
-rw-r--r--src/mongo/s/catalog/sharding_catalog_manager_mock.h10
-rw-r--r--src/mongo/s/catalog/type_chunk.h4
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_test.cpp48
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_type.cpp67
-rw-r--r--src/mongo/s/request_types/commit_chunk_migration_request_type.h43
10 files changed, 446 insertions, 342 deletions
diff --git a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
index 0db1f4eae09..10da0444a4a 100644
--- a/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
+++ b/src/mongo/db/s/config/configsvr_commit_chunk_migration_command.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/base/status_with.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -38,20 +39,21 @@
#include "mongo/db/s/chunk_move_write_concern_options.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/catalog/sharding_catalog_manager.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_locks.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/commit_chunk_migration_request_type.h"
+#include "mongo/util/fail_point.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
-MONGO_FP_DECLARE(migrationCommitError);
-MONGO_FP_DECLARE(migrationCommitVersionError);
+MONGO_FP_DECLARE(migrationCommitError); // delete this in 3.5
/**
* This command takes the chunk being migrated ("migratedChunk") and generates a new version for it
@@ -71,12 +73,12 @@ MONGO_FP_DECLARE(migrationCommitVersionError);
* Command Format:
* {
* _configsvrCommitChunkMigration: <database>.<collection>,
- * migratedChunk: {min: <min_value>, max: <max_value>},
- * controlChunk: {min: <min_value>, max: <max_value>}, (optional)
* fromShard: "<from_shard_name>",
* toShard: "<to_shard_name>",
- * fromShardCollectionVersion: <chunk_version>,
- * shardHasDistributedLock: true/false,
+ * migratedChunk: {min: <min_value>, max: <max_value>, etc. },
+ * controlChunk: {min: <min_value>, max: <max_value>, etc. }, (optional)
+ * fromShardCollectionVersion: { shardVersionField: <version> }, (for backward compatibility only)
+ * shardHasDistributedLock: true/false (for backward compatibility only)
* }
*
* Returns:
@@ -121,158 +123,18 @@ public:
}
/**
- * Checks that the epoch in the version the shard sent with the command matches the epoch of the
- * collection version found on the config server. It is possible for a migration to end up
- * running partly without the protection of the distributed lock. This function checks that the
- * collection has not been dropped and recreated since the migration began, unbeknown to the
- * shard when the command was sent.
- */
- void checkCollectionVersionEpoch(OperationContext* txn,
- const NamespaceString& nss,
- const ChunkRange& chunkRange,
- const ChunkVersion& shardCollectionVersion) {
- auto findResponse = uassertStatusOK(
- Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- txn,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- NamespaceString(ChunkType::ConfigNS),
- BSON(ChunkType::ns() << nss.ns()),
- BSONObj(),
- 1));
-
- if (MONGO_FAIL_POINT(migrationCommitVersionError)) {
- uassert(ErrorCodes::StaleEpoch,
- "failpoint 'migrationCommitVersionError' generated error",
- false);
- }
-
- uassert(ErrorCodes::IncompatibleShardingMetadata,
- str::stream()
- << "Could not find any chunks for collection '"
- << nss.ns()
- << "'. The collection has been dropped since the migration began. Aborting "
- << "migration commit for chunk ("
- << redact(chunkRange.toString())
- << ").",
- !findResponse.docs.empty());
-
- ChunkType chunk = uassertStatusOK(ChunkType::fromBSON(findResponse.docs.front()));
-
- uassert(ErrorCodes::StaleEpoch,
- str::stream() << "The collection '" << nss.ns()
- << "' has been dropped and recreated since the migration began."
- << " The config server's collection version epoch is now '"
- << chunk.getVersion().epoch().toString()
- << "', but the shard's is "
- << shardCollectionVersion.epoch().toString()
- << "'. Aborting migration commit for chunk ("
- << chunkRange.toString()
- << ").",
- chunk.getVersion().hasEqualEpoch(shardCollectionVersion));
- }
-
- static void checkChunkIsOnShard(OperationContext* txn,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const ShardId& shard) {
- BSONObj chunkQuery =
- BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max
- << ChunkType::shard()
- << shard);
- // Must use local read concern because we're going to perform subsequent writes.
- auto findResponse = uassertStatusOK(
- Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- txn,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- NamespaceString(ChunkType::ConfigNS),
- chunkQuery,
- BSONObj(),
- 1));
- uassert(40165,
- str::stream()
- << "Could not find the chunk ("
- << chunkQuery.toString()
- << ") on the shard. Cannot execute the migration commit with invalid chunks.",
- !findResponse.docs.empty());
- }
-
- BSONObj makeCommitChunkApplyOpsCommand(
- const NamespaceString& nss,
- const ChunkRange& migratedChunkRange,
- const boost::optional<ChunkRange>& controlChunkRange,
- const ChunkVersion newMigratedChunkVersion,
- const boost::optional<ChunkVersion> newControlChunkVersion,
- StringData toShard,
- StringData fromShard) {
- // Update migratedChunk's version and shard.
- BSONArrayBuilder updates;
- {
- BSONObjBuilder op;
- op.append("op", "u");
- op.appendBool("b", false); // No upserting
- op.append("ns", ChunkType::ConfigNS);
-
- BSONObjBuilder n(op.subobjStart("o"));
- n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunkRange.getMin()));
- newMigratedChunkVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
- n.append(ChunkType::ns(), nss.ns());
- n.append(ChunkType::min(), migratedChunkRange.getMin());
- n.append(ChunkType::max(), migratedChunkRange.getMax());
- n.append(ChunkType::shard(), toShard);
- n.done();
-
- BSONObjBuilder q(op.subobjStart("o2"));
- q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunkRange.getMin()));
- q.done();
-
- updates.append(op.obj());
- }
-
- // If we have a controlChunk, update its chunk version.
- if (controlChunkRange) {
- BSONObjBuilder op;
- op.append("op", "u");
- op.appendBool("b", false);
- op.append("ns", ChunkType::ConfigNS);
-
- BSONObjBuilder n(op.subobjStart("o"));
- n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunkRange->getMin()));
- newControlChunkVersion->addToBSON(n, ChunkType::DEPRECATED_lastmod());
- n.append(ChunkType::ns(), nss.ns());
- n.append(ChunkType::min(), controlChunkRange->getMin());
- n.append(ChunkType::max(), controlChunkRange->getMax());
- n.append(ChunkType::shard(), fromShard);
- n.done();
-
- BSONObjBuilder q(op.subobjStart("o2"));
- q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunkRange->getMin()));
- q.done();
-
- updates.append(op.obj());
- }
-
- // Do not give applyOps a write concern. If applyOps tries to wait for replication, it will
- // fail because of the GlobalWrite lock CommitChunkMigration already holds. Replication will
- // not be able to take the lock it requires.
- return BSON("applyOps" << updates.arr());
- }
-
- /**
* Assures that the balancer still holds the collection distributed lock for this collection. If
- * it no longer does, uassert, because we don't know if the collection state has changed -- e.g.
+ * it no longer does, fail because we don't know if the collection state has changed -- e.g.
* whether it was/is dropping, whether another imcompatible migration is running, etc..
*/
- void checkBalancerHasDistLock(OperationContext* txn,
- const NamespaceString& nss,
- const ChunkRange& chunkRange) {
+ static Status checkBalancerHasDistLock(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkType& chunk) {
auto balancerDistLockProcessID =
Grid::get(txn)->catalogClient(txn)->getDistLockManager()->getProcessID();
// Must use local read concern because we're going to perform subsequent writes.
- auto lockQueryResponse = uassertStatusOK(
+ auto lockQueryResponseWith =
Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
txn,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
@@ -280,21 +142,27 @@ public:
NamespaceString(LocksType::ConfigNS),
BSON(LocksType::process(balancerDistLockProcessID) << LocksType::name(nss.ns())),
BSONObj(),
- boost::none));
+ boost::none);
+ if (!lockQueryResponseWith.isOK()) {
+ return lockQueryResponseWith.getStatus();
+ }
- invariant(lockQueryResponse.docs.size() <= 1);
+ invariant(lockQueryResponseWith.getValue().docs.size() <= 1);
if (MONGO_FAIL_POINT(migrationCommitError)) {
- lockQueryResponse.docs.clear();
+ lockQueryResponseWith.getValue().docs.clear();
}
- uassert(ErrorCodes::BalancerLostDistributedLock,
+ if (lockQueryResponseWith.getValue().docs.size() != 1) {
+ return Status(
+ ErrorCodes::BalancerLostDistributedLock,
str::stream() << "The distributed lock for collection '" << nss.ns()
<< "' was lost by the balancer since this migration began. Cannot "
<< "proceed with the migration commit for chunk ("
- << chunkRange.toString()
- << ") because it could corrupt other operations.",
- lockQueryResponse.docs.size() == 1);
+ << chunk.getRange().toString()
+ << ") because it could corrupt other operations.");
+ }
+ return Status::OK();
}
bool run(OperationContext* txn,
@@ -303,108 +171,31 @@ public:
int options,
std::string& errmsg,
BSONObjBuilder& result) override {
+
const NamespaceString nss = NamespaceString(parseNs(dbName, cmdObj));
- CommitChunkMigrationRequest commitChunkMigrationRequest =
+ auto commitRequest =
uassertStatusOK(CommitChunkMigrationRequest::createFromCommand(nss, cmdObj));
- // Run operations under a nested lock as a hack to prevent yielding. When query/applyOps
- // commands are called, they will take a second lock, and the PlanExecutor will be unable to
- // yield.
- //
- // ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions
- // for migrated chunks are generated within the command. Therefore it cannot be allowed to
- // yield between generating the ChunkVersion and committing it to the database with
- // applyOps.
- Lock::GlobalWrite firstGlobalWriteLock(txn->lockState());
-
- if (!commitChunkMigrationRequest.shardHasDistributedLock()) {
- checkBalancerHasDistLock(txn, nss, commitChunkMigrationRequest.getMigratedChunkRange());
+ if (!commitRequest.shardHasDistributedLock()) {
+ auto check = checkBalancerHasDistLock(txn, nss, commitRequest.getMigratedChunk());
+ if (!check.isOK()) {
+ return appendCommandStatus(result, check);
+ }
}
- checkCollectionVersionEpoch(txn,
- nss,
- commitChunkMigrationRequest.getMigratedChunkRange(),
- commitChunkMigrationRequest.getFromShardCollectionVersion());
-
- // Check that migratedChunk and controlChunk are where they should be, on fromShard.
- checkChunkIsOnShard(txn,
- nss,
- commitChunkMigrationRequest.getMigratedChunkRange().getMin(),
- commitChunkMigrationRequest.getMigratedChunkRange().getMax(),
- commitChunkMigrationRequest.getFromShard());
-
- if (commitChunkMigrationRequest.hasControlChunkRange()) {
- checkChunkIsOnShard(txn,
- nss,
- commitChunkMigrationRequest.getControlChunkRange().getMin(),
- commitChunkMigrationRequest.getControlChunkRange().getMax(),
- commitChunkMigrationRequest.getFromShard());
+ StatusWith<BSONObj> response = Grid::get(txn)->catalogManager()->commitChunkMigration(
+ txn,
+ nss,
+ commitRequest.getMigratedChunk(),
+ commitRequest.getControlChunk(),
+ commitRequest.getCollectionEpoch(),
+ commitRequest.getFromShard(),
+ commitRequest.getToShard());
+ if (!response.isOK()) {
+ return appendCommandStatus(result, response.getStatus());
}
-
- // Generate the new chunk version (CV). Query the current max CV of the collection. Use the
- // incremented major version of the result returned. Migrating chunk's minor version will
- // be 0, control chunk's minor version will be 1 (if control chunk is present).
-
- // Must use local read concern because we're going to perform subsequent writes.
- auto findResponse = uassertStatusOK(
- Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- txn,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- NamespaceString(ChunkType::ConfigNS),
- BSON("ns" << nss.ns()),
- BSON(ChunkType::DEPRECATED_lastmod << -1),
- 1));
-
- std::vector<BSONObj> chunksVector = findResponse.docs;
- uassert(40164,
- str::stream() << "Tried to find max chunk version for collection '" << nss.ns()
- << ", but found no chunks",
- !chunksVector.empty());
-
- ChunkVersion currentMaxVersion =
- ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod());
-
- // Generate the new versions of migratedChunk and controlChunk.
- ChunkVersion newMigratedChunkVersion =
- ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch());
- boost::optional<ChunkVersion> newControlChunkVersion = boost::none;
- boost::optional<ChunkRange> newControlChunkRange = boost::none;
- if (commitChunkMigrationRequest.hasControlChunkRange()) {
- newControlChunkVersion =
- ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch());
- newControlChunkRange = commitChunkMigrationRequest.getControlChunkRange();
- }
-
- auto applyOpsCommandResponse =
- Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
- txn,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- nss.db().toString(),
- makeCommitChunkApplyOpsCommand(
- nss,
- commitChunkMigrationRequest.getMigratedChunkRange(),
- newControlChunkRange,
- newMigratedChunkVersion,
- newControlChunkVersion,
- commitChunkMigrationRequest.getToShard().toString(),
- commitChunkMigrationRequest.getFromShard().toString()),
- Shard::RetryPolicy::kIdempotent);
-
- if (!applyOpsCommandResponse.isOK()) {
- return appendCommandStatus(result, applyOpsCommandResponse.getStatus());
- }
-
- if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) {
- return appendCommandStatus(result, applyOpsCommandResponse.getValue().commandStatus);
- }
-
- newMigratedChunkVersion.appendWithFieldForCommands(&result, "migratedChunkVersion");
- if (commitChunkMigrationRequest.hasControlChunkRange()) {
- newControlChunkVersion->appendWithFieldForCommands(&result, "controlChunkVersion");
- }
-
+ result.appendElements(response.getValue());
return true;
}
diff --git a/src/mongo/s/catalog/sharding_catalog_manager.h b/src/mongo/s/catalog/sharding_catalog_manager.h
index 63f242d3598..da7709369b9 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager.h
@@ -25,7 +25,6 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
#pragma once
#include <string>
@@ -43,6 +42,7 @@ class OperationContext;
class RemoteCommandTargeter;
class ShardId;
class ShardType;
+class ChunkType;
class Status;
template <typename T>
class StatusWith;
@@ -138,8 +138,8 @@ public:
const ChunkRange& range) = 0;
/**
- * Updates chunk metadata in config.chunks collection to reflect the given chunk being split
- * into multiple smaller chunks based on the specified split points.
+ * Updates metadata in config.chunks collection to show the given chunk as split
+ * into smaller chunks at the specified split points.
*/
virtual Status commitChunkSplit(OperationContext* txn,
const NamespaceString& ns,
@@ -149,8 +149,8 @@ public:
const std::string& shardName) = 0;
/**
- * Updates chunk metadata in config.chunks collection to reflect the given chunks being merged
- * into a single larger chunk based on the specified boundaries of the smaller chunks.
+ * Updates metadata in config.chunks collection so the chunks with given boundaries are seen
+ * merged into a single larger chunk.
*/
virtual Status commitChunkMerge(OperationContext* txn,
const NamespaceString& ns,
@@ -159,6 +159,17 @@ public:
const std::string& shardName) = 0;
/**
+ * Updates metadata in config.chunks collection to show the given chunk in its new shard.
+ */
+ virtual StatusWith<BSONObj> commitChunkMigration(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkType& migratedChunk,
+ const boost::optional<ChunkType>& controlChunk,
+ const OID& collectionEpoch,
+ const ShardId& fromShard,
+ const ShardId& toShard) = 0;
+
+ /**
* Append information about the connection pools owned by the CatalogManager.
*/
virtual void appendConnectionStats(executor::ConnectionPoolStats* stats) = 0;
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp
index 17b3235256c..c519dba4b74 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.cpp
@@ -34,6 +34,7 @@
#include <iomanip>
+#include "mongo/base/error_codes.h"
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobjbuilder.h"
@@ -61,6 +62,7 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_config_version.h"
#include "mongo/s/catalog/type_database.h"
@@ -86,6 +88,7 @@
namespace mongo {
MONGO_FP_DECLARE(dontUpsertShardIdentityOnNewShards);
+MONGO_FP_DECLARE(migrationCommitVersionError);
using std::string;
using std::vector;
@@ -1473,6 +1476,279 @@ Status ShardingCatalogManagerImpl::commitChunkMerge(OperationContext* txn,
return applyOpsStatus;
}
+namespace {
+
+/**
+ * Checks that the epoch in the version the shard sent with the command matches the epoch of the
+ * collection version found on the config server. It is possible for a migration to end up
+ * running partly without the protection of the distributed lock. This function checks that the
+ * collection has not been dropped and recreated since the migration began, unbeknown to the
+ * shard when the command was sent.
+ */
+static Status checkCollectionVersionEpoch(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkType& aChunk,
+ const OID& collectionEpoch) {
+ auto findResponseWith =
+ Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON(ChunkType::ns() << nss.ns()),
+ BSONObj(),
+ 1);
+ if (!findResponseWith.isOK()) {
+ return findResponseWith.getStatus();
+ }
+
+ if (MONGO_FAIL_POINT(migrationCommitVersionError)) {
+ uassert(ErrorCodes::StaleEpoch,
+ "failpoint 'migrationCommitVersionError' generated error",
+ false);
+ }
+
+ if (findResponseWith.getValue().docs.empty()) {
+ return Status(
+ ErrorCodes::IncompatibleShardingMetadata,
+ str::stream()
+ << "Could not find any chunks for collection '"
+ << nss.ns()
+ << "'. The collection has been dropped since the migration began. Aborting"
+ " migration commit for chunk ("
+ << redact(aChunk.getRange().toString())
+ << ").");
+ }
+
+ auto chunkWith = ChunkType::fromBSON(findResponseWith.getValue().docs.front());
+ if (!chunkWith.isOK()) {
+ return chunkWith.getStatus();
+ } else if (chunkWith.getValue().getVersion().epoch() != collectionEpoch) {
+ return Status(ErrorCodes::StaleEpoch,
+ str::stream() << "The collection '" << nss.ns()
+ << "' has been dropped and recreated since the migration began."
+ " The config server's collection version epoch is now '"
+ << chunkWith.getValue().getVersion().epoch().toString()
+ << "', but the shard's is "
+ << collectionEpoch.toString()
+ << "'. Aborting migration commit for chunk ("
+ << redact(aChunk.getRange().toString())
+ << ").");
+ }
+ return Status::OK();
+}
+
+static Status checkChunkIsOnShard(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& min,
+ const BSONObj& max,
+ const ShardId& shard) {
+ BSONObj chunkQuery =
+ BSON(ChunkType::ns() << nss.ns() << ChunkType::min() << min << ChunkType::max() << max
+ << ChunkType::shard()
+ << shard);
+ // Must use local read concern because we're going to perform subsequent writes.
+ auto findResponseWith =
+ Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ chunkQuery,
+ BSONObj(),
+ 1);
+ if (!findResponseWith.isOK()) {
+ return findResponseWith.getStatus();
+ }
+ if (findResponseWith.getValue().docs.empty()) {
+ return Status(
+ ErrorCodes::Error(40165),
+ str::stream()
+ << "Could not find the chunk ("
+ << chunkQuery.toString()
+ << ") on the shard. Cannot execute the migration commit with invalid chunks.");
+ }
+ return Status::OK();
+}
+
+static BSONObj makeCommitChunkApplyOpsCommand(const NamespaceString& nss,
+ const ChunkType& migratedChunk,
+ const boost::optional<ChunkType>& controlChunk,
+ StringData fromShard,
+ StringData toShard) {
+
+ // Update migratedChunk's version and shard.
+ BSONArrayBuilder updates;
+ {
+ BSONObjBuilder op;
+ op.append("op", "u");
+ op.appendBool("b", false); // No upserting
+ op.append("ns", ChunkType::ConfigNS);
+
+ BSONObjBuilder n(op.subobjStart("o"));
+ n.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin()));
+ migratedChunk.getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod());
+ n.append(ChunkType::ns(), nss.ns());
+ n.append(ChunkType::min(), migratedChunk.getMin());
+ n.append(ChunkType::max(), migratedChunk.getMax());
+ n.append(ChunkType::shard(), toShard);
+ n.done();
+
+ BSONObjBuilder q(op.subobjStart("o2"));
+ q.append(ChunkType::name(), ChunkType::genID(nss.ns(), migratedChunk.getMin()));
+ q.done();
+
+ updates.append(op.obj());
+ }
+
+ // If we have a controlChunk, update its chunk version.
+ if (controlChunk) {
+ BSONObjBuilder op;
+ op.append("op", "u");
+ op.appendBool("b", false);
+ op.append("ns", ChunkType::ConfigNS);
+
+ BSONObjBuilder n(op.subobjStart("o"));
+ n.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin()));
+ controlChunk->getVersion().addToBSON(n, ChunkType::DEPRECATED_lastmod());
+ n.append(ChunkType::ns(), nss.ns());
+ n.append(ChunkType::min(), controlChunk->getMin());
+ n.append(ChunkType::max(), controlChunk->getMax());
+ n.append(ChunkType::shard(), fromShard);
+ n.done();
+
+ BSONObjBuilder q(op.subobjStart("o2"));
+ q.append(ChunkType::name(), ChunkType::genID(nss.ns(), controlChunk->getMin()));
+ q.done();
+
+ updates.append(op.obj());
+ }
+
+ // Do not give applyOps a write concern. If applyOps tries to wait for replication, it will
+ // fail because of the GlobalWrite lock CommitChunkMigration already holds. Replication will
+ // not be able to take the lock it requires.
+ return BSON("applyOps" << updates.arr());
+}
+
+} // namespace
+
+StatusWith<BSONObj> ShardingCatalogManagerImpl::commitChunkMigration(
+ OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkType& migratedChunk,
+ const boost::optional<ChunkType>& controlChunk,
+ const OID& collectionEpoch,
+ const ShardId& fromShard,
+ const ShardId& toShard) {
+
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations.
+ // TODO(SERVER-25359): Replace with a collection-specific lock map to allow splits/merges/
+ // move chunks on different collections to proceed in parallel.
+ // (Note: This is not needed while we have a global lock, taken here only for consistency.)
+ Lock::ExclusiveLock lk(txn->lockState(), _kChunkOpLock);
+
+ // Acquire GlobalLock in MODE_X twice to prevent yielding.
+ // Run operations under a nested lock as a hack to prevent yielding. When query/applyOps
+ // commands are called, they will take a second lock, and the PlanExecutor will be unable to
+ // yield.
+ //
+ // ConfigSvrCommitChunkMigration commands must be run serially because the new ChunkVersions
+ // for migrated chunks are generated within the command. Therefore it cannot be allowed to
+ // yield between generating the ChunkVersion and committing it to the database with
+ // applyOps.
+
+ Lock::GlobalWrite firstGlobalWriteLock(txn->lockState());
+
+ // Ensure that the epoch passed in still matches the real state of the database.
+
+ auto epochCheck = checkCollectionVersionEpoch(txn, nss, migratedChunk, collectionEpoch);
+ if (!epochCheck.isOK()) {
+ return epochCheck;
+ }
+
+ // Check that migratedChunk and controlChunk are where they should be, on fromShard.
+
+ auto migratedOnShard =
+ checkChunkIsOnShard(txn, nss, migratedChunk.getMin(), migratedChunk.getMax(), fromShard);
+ if (!migratedOnShard.isOK()) {
+ return migratedOnShard;
+ }
+
+ if (controlChunk) {
+ auto controlOnShard = checkChunkIsOnShard(
+ txn, nss, controlChunk->getMin(), controlChunk->getMax(), fromShard);
+ if (!controlOnShard.isOK()) {
+ return controlOnShard;
+ }
+ }
+
+ // Must use local read concern because we will perform subsequent writes.
+ auto findResponse = Grid::get(txn)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ NamespaceString(ChunkType::ConfigNS),
+ BSON("ns" << nss.ns()),
+ BSON(ChunkType::DEPRECATED_lastmod << -1),
+ 1);
+ if (!findResponse.isOK()) {
+ return findResponse.getStatus();
+ }
+
+ std::vector<BSONObj> chunksVector = std::move(findResponse.getValue().docs);
+ if (chunksVector.empty()) {
+ return Status(ErrorCodes::Error(40164),
+ str::stream() << "Tried to find max chunk version for collection '"
+ << nss.ns()
+ << ", but found no chunks");
+ }
+
+ ChunkVersion currentMaxVersion =
+ ChunkVersion::fromBSON(chunksVector.front(), ChunkType::DEPRECATED_lastmod());
+
+ // Use the incremented major version of the result returned.
+
+ // Generate the new versions of migratedChunk and controlChunk.
+ // Migrating chunk's minor version will be 0.
+ ChunkType newMigratedChunk = migratedChunk;
+ newMigratedChunk.setVersion(
+ ChunkVersion(currentMaxVersion.majorVersion() + 1, 0, currentMaxVersion.epoch()));
+
+ // Control chunk's minor version will be 1 (if control chunk is present).
+ boost::optional<ChunkType> newControlChunk = boost::none;
+ if (controlChunk) {
+ newControlChunk = controlChunk.get();
+ newControlChunk->setVersion(
+ ChunkVersion(currentMaxVersion.majorVersion() + 1, 1, currentMaxVersion.epoch()));
+ }
+
+ auto command = makeCommitChunkApplyOpsCommand(
+ nss, newMigratedChunk, newControlChunk, fromShard.toString(), toShard.toString());
+
+ StatusWith<Shard::CommandResponse> applyOpsCommandResponse =
+ Grid::get(txn)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
+ txn,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ nss.db().toString(),
+ command,
+ Shard::RetryPolicy::kIdempotent);
+
+ if (!applyOpsCommandResponse.isOK()) {
+ return applyOpsCommandResponse.getStatus();
+ }
+ if (!applyOpsCommandResponse.getValue().commandStatus.isOK()) {
+ return applyOpsCommandResponse.getValue().commandStatus;
+ }
+
+ BSONObjBuilder result;
+ newMigratedChunk.getVersion().appendWithFieldForCommands(&result, "migratedChunkVersion");
+ if (controlChunk) {
+ newControlChunk->getVersion().appendWithFieldForCommands(&result, "controlChunkVersion");
+ }
+ return result.obj();
+}
+
void ShardingCatalogManagerImpl::appendConnectionStats(executor::ConnectionPoolStats* stats) {
_executorForAddShard->appendConnectionStats(stats);
}
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_impl.h b/src/mongo/s/catalog/sharding_catalog_manager_impl.h
index 5294e31ace7..744bc30b31f 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_impl.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager_impl.h
@@ -43,6 +43,9 @@ class DatabaseType;
class RemoteCommandTargeter;
class ShardingCatalogClient;
class VersionType;
+class ShardId;
+template <typename T>
+class StatusWith;
namespace executor {
class TaskExecutor;
@@ -100,6 +103,14 @@ public:
const std::vector<BSONObj>& chunkBoundaries,
const std::string& shardName) override;
+ StatusWith<BSONObj> commitChunkMigration(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkType& migratedChunk,
+ const boost::optional<ChunkType>& controlChunk,
+ const OID& collectionEpoch,
+ const ShardId& fromShard,
+ const ShardId& toShard) override;
+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
index cc53dfbada1..44a0e884bb8 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.cpp
@@ -98,6 +98,17 @@ Status ShardingCatalogManagerMock::commitChunkMerge(OperationContext* txn,
return {ErrorCodes::InternalError, "Method not implemented"};
}
+StatusWith<BSONObj> ShardingCatalogManagerMock::commitChunkMigration(
+ OperationContext* txn,
+ const NamespaceString&,
+ const ChunkType&,
+ const boost::optional<ChunkType>&,
+ const OID& collectionEpoch,
+ const ShardId&,
+ const ShardId&) {
+ return {ErrorCodes::InternalError, "Method not implemented"};
+}
+
void ShardingCatalogManagerMock::appendConnectionStats(executor::ConnectionPoolStats* stats) {}
Status ShardingCatalogManagerMock::initializeConfigDatabaseIfNeeded(OperationContext* txn) {
diff --git a/src/mongo/s/catalog/sharding_catalog_manager_mock.h b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
index 23ab27831da..9a970c3a7be 100644
--- a/src/mongo/s/catalog/sharding_catalog_manager_mock.h
+++ b/src/mongo/s/catalog/sharding_catalog_manager_mock.h
@@ -28,6 +28,8 @@
#pragma once
+#include <boost/optional.hpp>
+#include "mongo/base/status_with.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/s/catalog/sharding_catalog_manager.h"
@@ -82,6 +84,14 @@ public:
const std::vector<BSONObj>& chunkBoundaries,
const std::string& shardName) override;
+ StatusWith<BSONObj> commitChunkMigration(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkType& migratedChunk,
+ const boost::optional<ChunkType>& controlChunk,
+ const OID& collectionEpoch,
+ const ShardId& fromShard,
+ const ShardId& toShard) override;
+
void appendConnectionStats(executor::ConnectionPoolStats* stats) override;
Status initializeConfigDatabaseIfNeeded(OperationContext* txn) override;
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index a7a0e1f7e30..de468e21f79 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -151,6 +151,10 @@ public:
}
void setMax(const BSONObj& max);
+ ChunkRange getRange() const {
+ return ChunkRange(getMin(), getMax());
+ }
+
bool isVersionSet() const {
return _version.is_initialized();
}
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
index f0563fa76e1..d9201693f2a 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_test.cpp
@@ -55,23 +55,23 @@ const char kConfigSvrCommitChunkMigration[] = "_configsvrCommitChunkMigration";
TEST(CommitChunkMigrationRequest, WithControlChunk) {
BSONObjBuilder builder;
- ChunkType migratedChunkType;
- migratedChunkType.setMin(kKey0);
- migratedChunkType.setMax(kKey1);
+ ChunkVersion fromShardCollectionVersion(1, 2, OID::gen());
- ChunkType controlChunkTypeTemp;
- controlChunkTypeTemp.setMin(kKey2);
- controlChunkTypeTemp.setMax(kKey3);
- boost::optional<ChunkType> controlChunkType = std::move(controlChunkTypeTemp);
+ ChunkType migratedChunk;
+ migratedChunk.setMin(kKey0);
+ migratedChunk.setMax(kKey1);
- ChunkVersion fromShardCollectionVersion(1, 2, OID::gen());
+ ChunkType controlChunk;
+ controlChunk.setMin(kKey2);
+ controlChunk.setMax(kKey3);
+ boost::optional<ChunkType> controlChunkOpt = controlChunk;
CommitChunkMigrationRequest::appendAsCommand(&builder,
kNamespaceString,
kShardId0,
kShardId1,
- migratedChunkType,
- controlChunkType,
+ migratedChunk,
+ controlChunkOpt,
fromShardCollectionVersion,
kShardHasDistributedLock);
@@ -83,21 +83,21 @@ TEST(CommitChunkMigrationRequest, WithControlChunk) {
ASSERT_EQ(kNamespaceString, request.getNss());
ASSERT_EQ(kShardId0, request.getFromShard());
ASSERT_EQ(kShardId1, request.getToShard());
- ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunkRange().getMin());
- ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunkRange().getMax());
- ASSERT(request.hasControlChunkRange());
- ASSERT_BSONOBJ_EQ(kKey2, request.getControlChunkRange().getMin());
- ASSERT_BSONOBJ_EQ(kKey3, request.getControlChunkRange().getMax());
- ASSERT_EQ(fromShardCollectionVersion, request.getFromShardCollectionVersion());
+ ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunk().getMin());
+ ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunk().getMax());
+ ASSERT(request.getControlChunk());
+ ASSERT_BSONOBJ_EQ(kKey2, request.getControlChunk()->getMin());
+ ASSERT_BSONOBJ_EQ(kKey3, request.getControlChunk()->getMax());
+ ASSERT_EQ(fromShardCollectionVersion.epoch(), request.getCollectionEpoch());
ASSERT_EQ(kShardHasDistributedLock, request.shardHasDistributedLock());
}
TEST(CommitChunkMigrationRequest, WithoutControlChunk) {
BSONObjBuilder builder;
- ChunkType migratedChunkType;
- migratedChunkType.setMin(kKey0);
- migratedChunkType.setMax(kKey1);
+ ChunkType migratedChunk;
+ migratedChunk.setMin(kKey0);
+ migratedChunk.setMax(kKey1);
ChunkVersion fromShardCollectionVersion(1, 2, OID::gen());
@@ -105,7 +105,7 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) {
kNamespaceString,
kShardId0,
kShardId1,
- migratedChunkType,
+ migratedChunk,
boost::none,
fromShardCollectionVersion,
kShardHasDistributedLock);
@@ -118,10 +118,10 @@ TEST(CommitChunkMigrationRequest, WithoutControlChunk) {
ASSERT_EQ(kNamespaceString, request.getNss());
ASSERT_EQ(kShardId0, request.getFromShard());
ASSERT_EQ(kShardId1, request.getToShard());
- ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunkRange().getMin());
- ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunkRange().getMax());
- ASSERT(!request.hasControlChunkRange());
- ASSERT_EQ(fromShardCollectionVersion, request.getFromShardCollectionVersion());
+ ASSERT_BSONOBJ_EQ(kKey0, request.getMigratedChunk().getMin());
+ ASSERT_BSONOBJ_EQ(kKey1, request.getMigratedChunk().getMax());
+ ASSERT(!request.getControlChunk());
+ ASSERT_EQ(fromShardCollectionVersion.epoch(), request.getCollectionEpoch());
ASSERT_EQ(kShardHasDistributedLock, request.shardHasDistributedLock());
}
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
index 94cfd070e7c..4b9fbfd149d 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.cpp
@@ -44,15 +44,22 @@ const char kFromShardCollectionVersion[] = "fromShardCollectionVersion";
const char kShardHasDistributedLock[] = "shardHasDistributedLock";
/**
- * Attempts to parse a ChunkRange from "field" in "source".
+ * Attempts to parse a (range-only!) ChunkType from "field" in "source".
*/
-StatusWith<ChunkRange> extractChunkRange(const BSONObj& source, StringData field) {
+StatusWith<ChunkType> extractChunk(const BSONObj& source, StringData field) {
BSONElement fieldElement;
auto status = bsonExtractTypedField(source, field, BSONType::Object, &fieldElement);
if (!status.isOK())
return status;
- return ChunkRange::fromBSON(fieldElement.Obj());
+ auto rangeWith = ChunkRange::fromBSON(fieldElement.Obj());
+ if (!rangeWith.isOK())
+ return rangeWith.getStatus();
+
+ ChunkType chunk;
+ chunk.setMin(rangeWith.getValue().getMin());
+ chunk.setMax(rangeWith.getValue().getMax());
+ return chunk;
}
/**
@@ -79,12 +86,12 @@ StatusWith<ShardId> extractShardId(const BSONObj& source, StringData field) {
StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromCommand(
const NamespaceString& nss, const BSONObj& obj) {
- auto migratedChunkRange = extractChunkRange(obj, kMigratedChunk);
- if (!migratedChunkRange.isOK()) {
- return migratedChunkRange.getStatus();
+ auto migratedChunk = extractChunk(obj, kMigratedChunk);
+ if (!migratedChunk.isOK()) {
+ return migratedChunk.getStatus();
}
- CommitChunkMigrationRequest request(nss, std::move(migratedChunkRange.getValue()));
+ CommitChunkMigrationRequest request(nss, std::move(migratedChunk.getValue()));
{
auto fromShard = extractShardId(obj, kFromShard);
@@ -107,23 +114,23 @@ StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromC
{
// controlChunk is optional, so parse it if present.
if (obj.hasField(kControlChunk)) {
- auto controlChunkRange = extractChunkRange(obj, kControlChunk);
- if (!controlChunkRange.isOK()) {
- return controlChunkRange.getStatus();
+ auto controlChunk = extractChunk(obj, kControlChunk);
+ if (!controlChunk.isOK()) {
+ return controlChunk.getStatus();
}
- request._controlChunkRange = std::move(controlChunkRange.getValue());
+ request._controlChunk = std::move(controlChunk.getValue());
}
}
{
auto statusWithChunkVersion =
ChunkVersion::parseFromBSONWithFieldForCommands(obj, kFromShardCollectionVersion);
- if (statusWithChunkVersion.isOK()) {
- request._fromShardCollectionVersion = std::move(statusWithChunkVersion.getValue());
- } else if (statusWithChunkVersion != ErrorCodes::NoSuchKey) {
+ if (!statusWithChunkVersion.isOK()) {
return statusWithChunkVersion.getStatus();
}
+
+ request._collectionEpoch = statusWithChunkVersion.getValue().epoch();
}
{
@@ -137,37 +144,27 @@ StatusWith<CommitChunkMigrationRequest> CommitChunkMigrationRequest::createFromC
return request;
}
-void CommitChunkMigrationRequest::appendAsCommand(
- BSONObjBuilder* builder,
- const NamespaceString& nss,
- const ShardId& fromShard,
- const ShardId& toShard,
- const ChunkType& migratedChunkType,
- const boost::optional<ChunkType>& controlChunkType,
- const ChunkVersion& fromShardCollectionVersion,
- const bool& shardHasDistributedLock) {
+void CommitChunkMigrationRequest::appendAsCommand(BSONObjBuilder* builder,
+ const NamespaceString& nss,
+ const ShardId& fromShard,
+ const ShardId& toShard,
+ const ChunkType& migratedChunk,
+ const boost::optional<ChunkType>& controlChunk,
+ const ChunkVersion& fromShardCollectionVersion,
+ const bool& shardHasDistributedLock) {
invariant(builder->asTempObj().isEmpty());
invariant(nss.isValid());
builder->append(kConfigSvrCommitChunkMigration, nss.ns());
builder->append(kFromShard, fromShard.toString());
builder->append(kToShard, toShard.toString());
- builder->append(kMigratedChunk, migratedChunkType.toBSON());
+ builder->append(kMigratedChunk, migratedChunk.toBSON());
fromShardCollectionVersion.appendWithFieldForCommands(builder, kFromShardCollectionVersion);
builder->append(kShardHasDistributedLock, shardHasDistributedLock);
- if (controlChunkType) {
- builder->append(kControlChunk, controlChunkType->toBSON());
+ if (controlChunk) {
+ builder->append(kControlChunk, controlChunk->toBSON());
}
}
-const ChunkRange& CommitChunkMigrationRequest::getControlChunkRange() const {
- invariant(_controlChunkRange);
- return _controlChunkRange.get();
-}
-
-CommitChunkMigrationRequest::CommitChunkMigrationRequest(const NamespaceString& nss,
- const ChunkRange& range)
- : _nss(nss), _migratedChunkRange(range) {}
-
} // namespace mongo
diff --git a/src/mongo/s/request_types/commit_chunk_migration_request_type.h b/src/mongo/s/request_types/commit_chunk_migration_request_type.h
index 461278e429e..c1f97f5c7fc 100644
--- a/src/mongo/s/request_types/commit_chunk_migration_request_type.h
+++ b/src/mongo/s/request_types/commit_chunk_migration_request_type.h
@@ -38,8 +38,11 @@ namespace mongo {
/**
* Creates and parses commit chunk migration command BSON objects.
*/
-class CommitChunkMigrationRequest {
-public:
+struct CommitChunkMigrationRequest {
+
+ CommitChunkMigrationRequest(const NamespaceString& nss, const ChunkType& chunk)
+ : _nss(nss), _migratedChunk(chunk), _shardHasDistributedLock() {}
+
/**
* Parses the input command and produces a request corresponding to its arguments.
*/
@@ -63,35 +66,26 @@ public:
const NamespaceString& getNss() const {
return _nss;
}
-
const ShardId& getFromShard() const {
return _fromShard;
}
-
const ShardId& getToShard() const {
return _toShard;
}
-
- const ChunkRange& getMigratedChunkRange() const {
- return _migratedChunkRange;
- }
-
- const ChunkRange& getControlChunkRange() const;
-
- bool hasControlChunkRange() {
- return bool(_controlChunkRange);
+ const ChunkType& getMigratedChunk() const {
+ return _migratedChunk;
}
-
- const ChunkVersion& getFromShardCollectionVersion() const {
- return _fromShardCollectionVersion;
+ const boost::optional<ChunkType>& getControlChunk() const {
+ return _controlChunk;
}
bool shardHasDistributedLock() {
return _shardHasDistributedLock;
}
-private:
- CommitChunkMigrationRequest(const NamespaceString& nss, const ChunkRange& range);
+ const OID& getCollectionEpoch() {
+ return _collectionEpoch;
+ }
// The collection for which this request applies.
NamespaceString _nss;
@@ -102,17 +96,16 @@ private:
// The recipient shard name.
ShardId _toShard;
- // Range of migrated chunk being moved.
- ChunkRange _migratedChunkRange;
-
- // Range of control chunk being moved, if it exists.
- boost::optional<ChunkRange> _controlChunkRange;
+ // The chunk being moved.
+ ChunkType _migratedChunk;
- // Collection version of the source shard.
- ChunkVersion _fromShardCollectionVersion;
+ // A chunk on the shard moved from, if any remain.
+ boost::optional<ChunkType> _controlChunk;
// Flag to indicate whether the shard has the distlock.
bool _shardHasDistributedLock;
+
+ OID _collectionEpoch;
};
} // namespace mongo