summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorPierlauro Sciarelli <pierlauro.sciarelli@mongodb.com>2022-04-25 13:19:16 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-25 14:08:47 +0000
commit36d3af6e81247713f53065769e73f4f0fa8e622d (patch)
tree773b9067805b081570745bdae549609c999e1ada /src/mongo/db
parent62e65260473152d8655d4a910505e6404067b907 (diff)
downloadmongo-36d3af6e81247713f53065769e73f4f0fa8e622d.tar.gz
SERVER-64817 Compute missing bound of moveRange within MigrationSourceManager
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/active_migrations_registry.cpp78
-rw-r--r--src/mongo/db/s/active_migrations_registry.h9
-rw-r--r--src/mongo/db/s/active_migrations_registry_test.cpp45
-rw-r--r--src/mongo/db/s/balancer/balancer.cpp6
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp9
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp106
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h26
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp108
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp227
-rw-r--r--src/mongo/db/s/migration_source_manager.h16
-rw-r--r--src/mongo/db/s/move_timing_helper.cpp14
-rw-r--r--src/mongo/db/s/move_timing_helper.h14
-rw-r--r--src/mongo/db/s/shardsvr_move_range_command.cpp76
13 files changed, 387 insertions, 347 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp
index 3ed54b597f6..baaa763daaa 100644
--- a/src/mongo/db/s/active_migrations_registry.cpp
+++ b/src/mongo/db/s/active_migrations_registry.cpp
@@ -93,11 +93,12 @@ void ActiveMigrationsRegistry::unlock(StringData reason) {
}
StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
- OperationContext* opCtx, const MoveChunkRequest& args) {
+ OperationContext* opCtx, const ShardsvrMoveRange& args) {
stdx::unique_lock<Latch> ul(_mutex);
opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] {
- return !_migrationsBlocked && !_activeSplitMergeChunkStates.count(args.getNss());
+ return !_migrationsBlocked &&
+ !_activeSplitMergeChunkStates.count(args.getCommandParameter());
});
if (_activeReceiveChunkState) {
@@ -105,24 +106,23 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk(
}
if (_activeMoveChunkState) {
- if (_activeMoveChunkState->args == args) {
- LOGV2(5004704,
- "registerDonateChunk ",
- "keys"_attr = ChunkRange(args.getMinKey(), args.getMaxKey()).toString(),
- "toShardId"_attr = args.getToShardId(),
- logAttrs(args.getNss()));
+ auto activeMoveChunkStateBSON = _activeMoveChunkState->args.toBSON({});
+
+ if (activeMoveChunkStateBSON.woCompare(args.toBSON({})) == 0) {
+ LOGV2(6386800,
+ "Registering new chunk donation",
+ logAttrs(args.getCommandParameter()),
+ "min"_attr = args.getMin(),
+ "max"_attr = args.getMax(),
+ "toShardId"_attr = args.getToShard());
return {ScopedDonateChunk(nullptr, false, _activeMoveChunkState->notification)};
}
- LOGV2(5004700,
- "registerDonateChunk",
- "currentKeys"_attr = ChunkRange(_activeMoveChunkState->args.getMinKey(),
- _activeMoveChunkState->args.getMaxKey())
- .toString(),
- "currentToShardId"_attr = _activeMoveChunkState->args.getToShardId(),
- "newKeys"_attr = ChunkRange(args.getMinKey(), args.getMaxKey()).toString(),
- "newToShardId"_attr = args.getToShardId(),
- logAttrs(args.getNss()));
+ LOGV2(6386801,
+ "Rejecting donate chunk due to conflicting migration in progress",
+ logAttrs(args.getCommandParameter()),
+ "runningMigration"_attr = activeMoveChunkStateBSON,
+ "requestedMigration"_attr = args.toBSON({}));
return _activeMoveChunkState->constructErrorStatus();
}
@@ -147,13 +147,10 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk(
}
if (_activeMoveChunkState) {
- LOGV2(5004701,
- "registerReceiveChunk ",
- "currentKeys"_attr = ChunkRange(_activeMoveChunkState->args.getMinKey(),
- _activeMoveChunkState->args.getMaxKey())
- .toString(),
- "currentToShardId"_attr = _activeMoveChunkState->args.getToShardId(),
- logAttrs(_activeMoveChunkState->args.getNss()));
+ LOGV2(6386802,
+ "Rejecting receive chunk due to conflicting donate chunk in progress",
+ logAttrs(_activeMoveChunkState->args.getCommandParameter()),
+ "runningMigration"_attr = _activeMoveChunkState->args.toBSON({}));
return _activeMoveChunkState->constructErrorStatus();
}
@@ -167,7 +164,8 @@ StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMerge
stdx::unique_lock<Latch> ul(_mutex);
opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] {
- return !(_activeMoveChunkState && _activeMoveChunkState->args.getNss() == nss) &&
+ return !(_activeMoveChunkState &&
+ _activeMoveChunkState->args.getCommandParameter() == nss) &&
!_activeSplitMergeChunkStates.count(nss);
});
@@ -181,7 +179,7 @@ StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMerge
boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() {
stdx::lock_guard<Latch> lk(_mutex);
if (_activeMoveChunkState) {
- return _activeMoveChunkState->args.getNss();
+ return _activeMoveChunkState->args.getCommandParameter();
}
return boost::none;
@@ -193,7 +191,7 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex
stdx::lock_guard<Latch> lk(_mutex);
if (_activeMoveChunkState) {
- nss = _activeMoveChunkState->args.getNss();
+ nss = _activeMoveChunkState->args.getCommandParameter();
}
}
@@ -218,12 +216,12 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex
void ActiveMigrationsRegistry::_clearDonateChunk() {
stdx::lock_guard<Latch> lk(_mutex);
invariant(_activeMoveChunkState);
- LOGV2(5004702,
- "clearDonateChunk ",
- "currentKeys"_attr = ChunkRange(_activeMoveChunkState->args.getMinKey(),
- _activeMoveChunkState->args.getMaxKey())
- .toString(),
- "currentToShardId"_attr = _activeMoveChunkState->args.getToShardId());
+ LOGV2(6386803,
+ "Unregistering donate chunk",
+ logAttrs(_activeMoveChunkState->args.getCommandParameter()),
+ "min"_attr = _activeMoveChunkState->args.getMin().get_value_or(BSONObj()),
+ "max"_attr = _activeMoveChunkState->args.getMax().get_value_or(BSONObj()),
+ "toShardId"_attr = _activeMoveChunkState->args.getToShard());
_activeMoveChunkState.reset();
_chunkOperationsStateChangedCV.notify_all();
}
@@ -247,12 +245,14 @@ void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss)
}
Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const {
- return {ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to start new balancer operation because this shard is "
- "currently donating chunk "
- << ChunkRange(args.getMinKey(), args.getMaxKey()).toString()
- << " for namespace " << args.getNss().ns() << " to "
- << args.getToShardId()};
+ std::string errMsg = fmt::format(
+ "Unable to start new balancer operation because this shard is currently donating range "
+ "'{}{}' for namespace {} to shard {}",
+ (args.getMin() ? "min: " + args.getMin()->toString() + " - " : ""),
+ (args.getMax() ? "max: " + args.getMax()->toString() : ""),
+ args.getCommandParameter().ns(),
+ args.getToShard().toString());
+ return {ErrorCodes::ConflictingOperationInProgress, std::move(errMsg)};
}
Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const {
diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h
index f6a99185d4a..f7616cc0f9f 100644
--- a/src/mongo/db/s/active_migrations_registry.h
+++ b/src/mongo/db/s/active_migrations_registry.h
@@ -33,7 +33,8 @@
#include "mongo/db/s/migration_session_id.h"
#include "mongo/platform/mutex.h"
-#include "mongo/s/request_types/move_chunk_request.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/request_types/move_range_request_gen.h"
#include "mongo/util/concurrency/notification.h"
namespace mongo {
@@ -87,7 +88,7 @@ public:
* Otherwise returns a ConflictingOperationInProgress error.
*/
StatusWith<ScopedDonateChunk> registerDonateChunk(OperationContext* opCtx,
- const MoveChunkRequest& args);
+ const ShardsvrMoveRange& args);
/**
* If there are no migrations or split/merges running on this shard, registers an active receive
@@ -132,7 +133,7 @@ private:
// Describes the state of a currently active moveChunk operation
struct ActiveMoveChunkState {
- ActiveMoveChunkState(MoveChunkRequest inArgs)
+ ActiveMoveChunkState(ShardsvrMoveRange inArgs)
: args(std::move(inArgs)), notification(std::make_shared<Notification<Status>>()) {}
/**
@@ -141,7 +142,7 @@ private:
Status constructErrorStatus() const;
// Exact arguments of the currently active operation
- MoveChunkRequest args;
+ ShardsvrMoveRange args;
// Notification event that will be signaled when the currently active operation completes
std::shared_ptr<Notification<Status>> notification;
diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp
index f637e3c4e1f..389728c8ace 100644
--- a/src/mongo/db/s/active_migrations_registry_test.cpp
+++ b/src/mongo/db/s/active_migrations_registry_test.cpp
@@ -58,27 +58,22 @@ protected:
ServiceContext::UniqueOperationContext _opCtx;
};
-MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) {
- const ChunkVersion chunkVersion(1, 2, OID::gen(), Timestamp(1, 1));
-
- BSONObjBuilder builder;
- MoveChunkRequest::appendAsCommand(
- &builder,
- nss,
- chunkVersion,
- ShardId("shard0001"),
- ShardId("shard0002"),
- ChunkRange(BSON("Key" << -100), BSON("Key" << 100)),
- 1024,
- MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff),
- true,
- MoveChunkRequest::ForceJumbo::kDoNotForce);
- return assertGet(MoveChunkRequest::createFromCommand(nss, builder.obj()));
+ShardsvrMoveRange createMoveRangeRequest(const NamespaceString& nss,
+ const OID& epoch = OID::gen()) {
+ const ShardId fromShard = ShardId("shard0001");
+ const long long maxChunkSizeBytes = 1024;
+ ShardsvrMoveRange req(nss, fromShard, maxChunkSizeBytes);
+ req.setEpoch(epoch);
+ req.getMoveRangeRequestBase().setToShard(ShardId("shard0002"));
+ req.setMaxChunkSizeBytes(1024);
+ req.getMoveRangeRequestBase().setMin(BSON("Key" << -100));
+ req.getMoveRangeRequestBase().setMax(BSON("Key" << 100));
+ return req;
}
TEST_F(MoveChunkRegistration, ScopedDonateChunkMoveConstructorAndAssignment) {
auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl"))));
ASSERT(originalScopedDonateChunk.mustExecute());
ScopedDonateChunk movedScopedDonateChunk(std::move(originalScopedDonateChunk));
@@ -97,7 +92,7 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) {
const NamespaceString nss("TestDB", "TestColl");
auto originalScopedDonateChunk =
- assertGet(_registry.registerDonateChunk(operationContext(), createMoveChunkRequest(nss)));
+ assertGet(_registry.registerDonateChunk(operationContext(), createMoveRangeRequest(nss)));
ASSERT_EQ(nss.ns(), _registry.getActiveDonateChunkNss()->ns());
@@ -107,10 +102,10 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) {
TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgress) {
auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl1"))));
+ operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl1"))));
auto secondScopedDonateChunkStatus = _registry.registerDonateChunk(
- operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl2")));
+ operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl2")));
ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
secondScopedDonateChunkStatus.getStatus());
@@ -118,12 +113,14 @@ TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgre
}
TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) {
+ const auto epoch = OID::gen();
+
auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl"), epoch)));
ASSERT(originalScopedDonateChunk.mustExecute());
auto secondScopedDonateChunk = assertGet(_registry.registerDonateChunk(
- operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))));
+ operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl"), epoch)));
ASSERT(!secondScopedDonateChunk.mustExecute());
originalScopedDonateChunk.signalComplete({ErrorCodes::InternalError, "Test error"});
@@ -171,7 +168,7 @@ TEST_F(MoveChunkRegistration, TestBlockingDonateChunk) {
// 6. Now that we're woken up by the registry thread, let's attempt to start to donate.
// This will block and call the lambda set on the baton above.
auto scopedDonateChunk = _registry.registerDonateChunk(
- opCtx.get(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")));
+ opCtx.get(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl")));
ASSERT_OK(scopedDonateChunk.getStatus());
scopedDonateChunk.getValue().signalComplete(Status::OK());
@@ -262,7 +259,7 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) {
auto result = stdx::async(stdx::launch::async, [&] {
// 2. Start a migration so that the registry lock will block when acquired.
auto scopedDonateChunk = _registry.registerDonateChunk(
- operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")));
+ operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl")));
ASSERT_OK(scopedDonateChunk.getStatus());
// 3. Signal the registry locking thread that the registry is ready to be locked.
diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp
index 21b87bdba17..dc40cd77e5d 100644
--- a/src/mongo/db/s/balancer/balancer.cpp
+++ b/src/mongo/db/s/balancer/balancer.cpp
@@ -413,7 +413,7 @@ Status Balancer::moveRange(OperationContext* opCtx,
ShardsvrMoveRange shardSvrRequest(nss);
shardSvrRequest.setDbName(NamespaceString::kAdminDb);
- shardSvrRequest.setMoveRangeRequest(request.getMoveRangeRequest());
+ shardSvrRequest.setMoveRangeRequestBase(request.getMoveRangeRequestBase());
shardSvrRequest.setMaxChunkSizeBytes(maxChunkSize);
shardSvrRequest.setFromShard(fromShardId);
shardSvrRequest.setEpoch(coll.getEpoch());
@@ -982,7 +982,7 @@ int Balancer::_moveChunks(OperationContext* opCtx,
return _commandScheduler->requestMoveChunk(opCtx, migrateInfo, settings);
}
- MoveRangeRequest requestBase(migrateInfo.to);
+ MoveRangeRequestBase requestBase(migrateInfo.to);
requestBase.setWaitForDelete(balancerConfig->waitForDelete());
requestBase.setMin(migrateInfo.minKey);
if (!feature_flags::gNoMoreAutoSplitter.isEnabled(
@@ -993,7 +993,7 @@ int Balancer::_moveChunks(OperationContext* opCtx,
ShardsvrMoveRange shardSvrRequest(migrateInfo.nss);
shardSvrRequest.setDbName(NamespaceString::kAdminDb);
- shardSvrRequest.setMoveRangeRequest(requestBase);
+ shardSvrRequest.setMoveRangeRequestBase(requestBase);
shardSvrRequest.setMaxChunkSizeBytes(maxChunkSizeBytes);
shardSvrRequest.setFromShard(migrateInfo.from);
shardSvrRequest.setEpoch(coll.getEpoch());
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
index a4ba1410014..a2f2dc56a29 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
@@ -178,10 +178,11 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) {
ShardsvrMoveRange shardsvrRequest(kNss);
shardsvrRequest.setDbName(NamespaceString::kAdminDb);
shardsvrRequest.setFromShard(kShardId0);
- auto& moveRangeRequest = shardsvrRequest.getMoveRangeRequest();
- moveRangeRequest.setToShard(kShardId1);
- moveRangeRequest.setMin({});
- moveRangeRequest.setMax({});
+ shardsvrRequest.setMaxChunkSizeBytes(1024);
+ auto& moveRangeRequestBase = shardsvrRequest.getMoveRangeRequestBase();
+ moveRangeRequestBase.setToShard(kShardId1);
+ moveRangeRequestBase.setMin({});
+ moveRangeRequestBase.setMax({});
auto networkResponseFuture = launchAsync([&]() {
onCommand(
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 1b9f825d3d9..c073ec383e6 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -225,8 +225,8 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
continue;
}
- auto const& minKey = cloner->_args.getMinKey();
- auto const& maxKey = cloner->_args.getMaxKey();
+ auto const& minKey = cloner->_args.getMin().get();
+ auto const& maxKey = cloner->_args.getMax().get();
auto const& shardKeyPattern = cloner->_shardKeyPattern;
if (!isInRange(documentKey, minKey, maxKey, shardKeyPattern)) {
@@ -250,17 +250,20 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
}
}
-MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
- const BSONObj& shardKeyPattern,
- ConnectionString donorConnStr,
- HostAndPort recipientHost)
- : _args(std::move(request)),
+MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(
+ const ShardsvrMoveRange& request,
+ const WriteConcernOptions& writeConcern,
+ const BSONObj& shardKeyPattern,
+ ConnectionString donorConnStr,
+ HostAndPort recipientHost)
+ : _args(request),
+ _writeConcern(writeConcern),
_shardKeyPattern(shardKeyPattern),
- _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(),
- _args.getToShardId().toString())),
+ _sessionId(MigrationSessionId::generate(_args.getFromShard().toString(),
+ _args.getToShard().toString())),
_donorConnStr(std::move(donorConnStr)),
_recipientHost(std::move(recipientHost)),
- _forceJumbo(_args.getForceJumbo() != MoveChunkRequest::ForceJumbo::kDoNotForce) {}
+ _forceJumbo(_args.getForceJumbo() != ForceJumbo::kDoNotForce) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -276,10 +279,7 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx,
auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
_sessionCatalogSource = std::make_unique<SessionCatalogMigrationSource>(
- opCtx,
- _args.getNss(),
- ChunkRange(_args.getMinKey(), _args.getMaxKey()),
- _shardKeyPattern.getKeyPattern());
+ opCtx, nss(), ChunkRange(getMin(), getMax()), _shardKeyPattern.getKeyPattern());
// Prime up the session migration source if there are oplog entries to migrate.
_sessionCatalogSource->fetchNextOplog(opCtx);
@@ -310,19 +310,24 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx,
// Tell the recipient shard to start cloning
BSONObjBuilder cmdBuilder;
+ const bool isThrottled = _args.getSecondaryThrottle();
+ MigrationSecondaryThrottleOptions secondaryThrottleOptions = isThrottled
+ ? MigrationSecondaryThrottleOptions::createWithWriteConcern(_writeConcern)
+ : MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff);
+
StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
- _args.getNss(),
+ nss(),
migrationId,
lsid,
txnNumber,
_sessionId,
_donorConnStr,
- _args.getFromShardId(),
- _args.getToShardId(),
- _args.getMinKey(),
- _args.getMaxKey(),
+ _args.getFromShard(),
+ _args.getToShard(),
+ getMin(),
+ getMax(),
_shardKeyPattern.toBSON(),
- _args.getSecondaryThrottle());
+ secondaryThrottleOptions);
// Commands sent to shards that accept writeConcern, must always have writeConcern. So if the
// StartChunkCloneRequest didn't add writeConcern (from secondaryThrottle), then we add the
@@ -355,8 +360,7 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate(
invariant(!opCtx->lockState()->isLocked());
// If this migration is manual migration that specified "force", enter the critical section
// immediately. This means the entire cloning phase will be done under the critical section.
- if (_jumboChunkCloneState &&
- _args.getForceJumbo() == MoveChunkRequest::ForceJumbo::kForceManual) {
+ if (_jumboChunkCloneState && _args.getForceJumbo() == ForceJumbo::kForceManual) {
return Status::OK();
}
@@ -368,7 +372,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte
invariant(_state == kCloning);
invariant(!opCtx->lockState()->isLocked());
if (_jumboChunkCloneState && _forceJumbo) {
- if (_args.getForceJumbo() == MoveChunkRequest::ForceJumbo::kForceManual) {
+ if (_args.getForceJumbo() == ForceJumbo::kForceManual) {
auto status = _checkRecipientCloningStatus(opCtx, kMaxWaitToCommitCloneForJumboChunk);
if (!status.isOK()) {
return status;
@@ -385,7 +389,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte
auto responseStatus = _callRecipient(opCtx, [&] {
BSONObjBuilder builder;
- builder.append(kRecvChunkCommit, _args.getNss().ns());
+ builder.append(kRecvChunkCommit, nss().ns());
builder.append("acquireCSOnRecipient", acquireCSOnRecipient);
_sessionId.append(&builder);
return builder.obj();
@@ -419,8 +423,8 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) noex
break;
case kCloning: {
const auto status =
- _callRecipient(
- opCtx, createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId))
+ _callRecipient(opCtx,
+ createRequestWithSessionId(kRecvChunkAbort, nss(), _sessionId))
.getStatus();
if (!status.isOK()) {
LOGV2(21991,
@@ -439,13 +443,13 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) noex
}
bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& doc) {
- return isInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern);
+ return isInRange(doc, getMin(), getMax(), _shardKeyPattern);
}
void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
const BSONObj& insertedDoc,
const repl::OpTime& opTime) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IX));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IX));
BSONElement idElement = insertedDoc["_id"];
if (idElement.eoo()) {
@@ -458,7 +462,7 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
return;
}
- if (!isInRange(insertedDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
+ if (!isInRange(insertedDoc, getMin(), getMax(), _shardKeyPattern)) {
return;
}
@@ -480,7 +484,7 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
const BSONObj& postImageDoc,
const repl::OpTime& opTime,
const repl::OpTime& prePostImageOpTime) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IX));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IX));
BSONElement idElement = postImageDoc["_id"];
if (idElement.eoo()) {
@@ -493,13 +497,12 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
return;
}
- if (!isInRange(postImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
+ if (!isInRange(postImageDoc, getMin(), getMax(), _shardKeyPattern)) {
// If the preImageDoc is not in range but the postImageDoc was, we know that the document
// has changed shard keys and no longer belongs in the chunk being cloned. We will model
// the deletion of the preImage document so that the destination chunk does not receive an
// outdated version of this document.
- if (preImageDoc &&
- isInRange(*preImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
+ if (preImageDoc && isInRange(*preImageDoc, getMin(), getMax(), _shardKeyPattern)) {
onDeleteOp(opCtx, *preImageDoc, opTime, prePostImageOpTime);
}
return;
@@ -522,7 +525,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
const BSONObj& deletedDocId,
const repl::OpTime& opTime,
const repl::OpTime&) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IX));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IX));
BSONElement idElement = deletedDocId["_id"];
if (idElement.eoo()) {
@@ -728,7 +731,7 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
const CollectionPtr& collection,
BSONArrayBuilder* arrBuilder) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS));
// If this chunk is too large to store records in _cloneLocs and the command args specify to
// attempt to move it, scan the collection directly.
@@ -748,7 +751,7 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx,
Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
Database* db,
BSONObjBuilder* builder) {
- dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS));
+ dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS));
std::list<BSONObj> deleteList;
std::list<BSONObj> updateList;
@@ -768,7 +771,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx,
updateList.splice(updateList.cbegin(), _reload);
}
- StringData ns = _args.getNss().ns().c_str();
+ StringData ns = nss().ns().c_str();
BSONArrayBuilder arrDel(builder->subarrayStart("deleted"));
auto noopFn = [](BSONObj idDoc, BSONObj* fullDoc) {
*fullDoc = idDoc;
@@ -869,14 +872,14 @@ MigrationChunkClonerSourceLegacy::_getIndexScanExecutor(
if (!shardKeyIdx) {
return {ErrorCodes::IndexNotFound,
str::stream() << "can't find index with prefix " << _shardKeyPattern.toBSON()
- << " in storeCurrentLocs for " << _args.getNss().ns()};
+ << " in storeCurrentLocs for " << nss().ns()};
}
// Assume both min and max non-empty, append MinKey's to make them fit chosen index
const KeyPattern kp(shardKeyIdx->keyPattern());
- BSONObj min = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMinKey(), false));
- BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMaxKey(), false));
+ BSONObj min = Helpers::toKeyFormat(kp.extendRangeBound(getMin(), false));
+ BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(getMax(), false));
// We can afford to yield here because any change to the base data that we might miss is already
// being queued and will migrate in the 'transferMods' stage.
@@ -892,10 +895,10 @@ MigrationChunkClonerSourceLegacy::_getIndexScanExecutor(
}
Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) {
- AutoGetCollection collection(opCtx, _args.getNss(), MODE_IS);
+ AutoGetCollection collection(opCtx, nss(), MODE_IS);
if (!collection) {
return {ErrorCodes::NamespaceNotFound,
- str::stream() << "Collection " << _args.getNss().ns() << " does not exist."};
+ str::stream() << "Collection " << nss().ns() << " does not exist."};
}
auto swExec = _getIndexScanExecutor(
@@ -971,7 +974,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
if (!idIdx) {
return {ErrorCodes::IndexNotFound,
str::stream() << "can't find index '_id' in storeCurrentLocs for "
- << _args.getNss().ns()};
+ << nss().ns()};
}
averageObjectIdSize = idIdx->accessMethod()->getSpaceUsedBytes(opCtx) / totalRecs;
}
@@ -983,8 +986,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC
<< maxRecsWhenFull << ", the maximum chunk size is "
<< _args.getMaxChunkSizeBytes() << ", average document size is "
<< avgRecSize << ". Found " << recCount << " documents in chunk "
- << " ns: " << _args.getNss().ns() << " " << _args.getMinKey() << " -> "
- << _args.getMaxKey()};
+ << " ns: " << nss().ns() << " " << getMin() << " -> " << getMax()};
}
stdx::lock_guard<Latch> lk(_mutex);
@@ -1034,7 +1036,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
int iteration = 0;
while ((Date_t::now() - startTime) < maxTimeToWait) {
auto responseStatus = _callRecipient(
- opCtx, createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true));
+ opCtx, createRequestWithSessionId(kRecvChunkStatus, nss(), _sessionId, true));
if (!responseStatus.isOK()) {
return responseStatus.getStatus().withContext(
"Failed to contact recipient shard to monitor data transfer");
@@ -1142,12 +1144,12 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
<< migrationSessionIdStatus.getStatus().toString()};
}
- if (res["ns"].str() != _args.getNss().ns() ||
+ if (res["ns"].str() != nss().ns() ||
(res.hasField("fromShardId")
- ? (res["fromShardId"].str() != _args.getFromShardId().toString())
+ ? (res["fromShardId"].str() != _args.getFromShard().toString())
: (res["from"].str() != _donorConnStr.toString())) ||
- !res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 ||
- !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 ||
+ !res["min"].isABSONObj() || res["min"].Obj().woCompare(getMin()) != 0 ||
+ !res["max"].isABSONObj() || res["max"].Obj().woCompare(getMax()) != 0 ||
!_sessionId.matches(migrationSessionIdStatus.getValue())) {
// This can happen when the destination aborted the migration and received another
// recvChunk before this thread sees the transition to the abort state. This is
@@ -1158,7 +1160,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"Destination shard aborted migration because a new one is running"};
}
- if (_args.getForceJumbo() != MoveChunkRequest::ForceJumbo::kForceManual &&
+ if (_args.getForceJumbo() != ForceJumbo::kForceManual &&
(_memoryUsed > 500 * 1024 * 1024 ||
(_jumboChunkCloneState && MONGO_unlikely(failTooMuchMemoryUsed.shouldFail())))) {
// This is too much memory for us to use so we're going to abort the migration
@@ -1182,7 +1184,7 @@ boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigra
}
repl::OpTime opTimeToWaitIfWaitingForMajority;
- const ChunkRange range(_args.getMinKey(), _args.getMaxKey());
+ const ChunkRange range(getMin(), getMax());
while (_sessionCatalogSource->hasMoreOplog()) {
auto result = _sessionCatalogSource->getLastFetchedOplog();
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 45bcdb12404..8c15fa7a0cb 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -42,7 +42,7 @@
#include "mongo/db/s/migration_session_id.h"
#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/platform/mutex.h"
-#include "mongo/s/request_types/move_chunk_request.h"
+#include "mongo/s/request_types/move_range_request_gen.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/net/hostandport.h"
@@ -88,7 +88,8 @@ class MigrationChunkClonerSourceLegacy final : public MigrationChunkClonerSource
MigrationChunkClonerSourceLegacy& operator=(const MigrationChunkClonerSourceLegacy&) = delete;
public:
- MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
+ MigrationChunkClonerSourceLegacy(const ShardsvrMoveRange& request,
+ const WriteConcernOptions& writeConcern,
const BSONObj& shardKeyPattern,
ConnectionString donorConnStr,
HostAndPort recipientHost);
@@ -205,6 +206,20 @@ public:
*/
std::shared_ptr<Notification<bool>> getNotificationForNextSessionMigrationBatch();
+ const NamespaceString& nss() {
+ return _args.getCommandParameter();
+ }
+
+ const BSONObj& getMin() {
+ invariant(_args.getMin());
+ return *_args.getMin();
+ }
+
+ const BSONObj& getMax() {
+ invariant(_args.getMax());
+ return *_args.getMax();
+ }
+
private:
friend class LogOpForShardingHandler;
friend class LogTransactionOperationsForShardingHandler;
@@ -307,8 +322,11 @@ private:
*/
Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait);
- // The original move chunk request
- const MoveChunkRequest _args;
+ // The original move range request
+ const ShardsvrMoveRange _args;
+
+ // The write concern associated with the move range
+ const WriteConcernOptions _writeConcern;
// The shard key associated with the namespace
const ShardKeyPattern _shardKeyPattern;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 0ec28d4b862..623524a545d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -200,21 +200,15 @@ protected:
* Shortcut to create BSON represenation of a moveChunk request for the specified range with
* fixed kDonorConnStr and kRecipientConnStr, respectively.
*/
- static MoveChunkRequest createMoveChunkRequest(const ChunkRange& chunkRange) {
- BSONObjBuilder cmdBuilder;
- MoveChunkRequest::appendAsCommand(
- &cmdBuilder,
- kNss,
- ChunkVersion(1, 0, OID::gen(), Timestamp(1, 1)),
- kDonorConnStr.getSetName(),
- kRecipientConnStr.getSetName(),
- chunkRange,
- 1024 * 1024,
- MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault),
- false,
- MoveChunkRequest::ForceJumbo::kDoNotForce);
-
- return assertGet(MoveChunkRequest::createFromCommand(kNss, cmdBuilder.obj()));
+ static ShardsvrMoveRange createMoveRangeRequest(const ChunkRange& chunkRange) {
+ ShardsvrMoveRange req(kNss);
+ req.setEpoch(OID::gen());
+ req.setFromShard(ShardId(kDonorConnStr.getSetName()));
+ req.setMaxChunkSizeBytes(1024);
+ req.getMoveRangeRequestBase().setToShard(ShardId(kRecipientConnStr.getSetName()));
+ req.getMoveRangeRequestBase().setMin(chunkRange.getMin());
+ req.getMoveRangeRequestBase().setMax(chunkRange.getMax());
+ return req;
}
/**
@@ -278,11 +272,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) {
createShardedCollection(contents);
- MigrationChunkClonerSourceLegacy cloner(
- createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
+ const ShardsvrMoveRange req =
+ createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
{
auto futureStartClone = launchAsync([&]() {
@@ -387,11 +383,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, RemoveDuplicateDocuments) {
createShardedCollection(contents);
- MigrationChunkClonerSourceLegacy cloner(
- createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
+ const ShardsvrMoveRange req =
+ createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
{
auto futureStartClone = launchAsync([&]() {
@@ -479,11 +477,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, OneLargeDocumentTransferMods) {
createShardedCollection(contents);
- MigrationChunkClonerSourceLegacy cloner(
- createMoveChunkRequest(ChunkRange(BSON("X" << 1), BSON("X" << 100))),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
+ const ShardsvrMoveRange req =
+ createMoveRangeRequest(ChunkRange(BSON("X" << 1), BSON("X" << 100)));
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
{
auto futureStartClone = launchAsync([&]() {
@@ -539,11 +539,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) {
createShardedCollection(contents);
- MigrationChunkClonerSourceLegacy cloner(
- createMoveChunkRequest(ChunkRange(BSON("X" << 1), BSON("X" << 1000000))),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
+ const ShardsvrMoveRange req =
+ createMoveRangeRequest(ChunkRange(BSON("X" << 1), BSON("X" << 1000000)));
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
{
auto futureStartClone = launchAsync([&]() {
@@ -610,11 +612,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) {
}
TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) {
- MigrationChunkClonerSourceLegacy cloner(
- createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
+ const ShardsvrMoveRange req =
+ createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
cloner.cancelClone(operationContext());
@@ -628,11 +632,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) {
operationContext(), kNss.db().toString(), BSON("create" << kNss.coll())));
}
- MigrationChunkClonerSourceLegacy cloner(
- createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
+ const ShardsvrMoveRange req =
+ createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber));
cloner.cancelClone(operationContext());
@@ -646,11 +652,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) {
createShardedCollection(contents);
- MigrationChunkClonerSourceLegacy cloner(
- createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))),
- kShardKeyPattern,
- kDonorConnStr,
- kRecipientConnStr.getServers()[0]);
+ const ShardsvrMoveRange req =
+ createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200)));
+ MigrationChunkClonerSourceLegacy cloner(req,
+ WriteConcernOptions(),
+ kShardKeyPattern,
+ kDonorConnStr,
+ kRecipientConnStr.getServers()[0]);
{
auto futureStartClone = launchAsync([&]() {
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 72791d7d7a0..35f4d814295 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/read_concern.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/s/auto_split_vector.h"
#include "mongo/db/s/migration_chunk_cloner_source_legacy.h"
#include "mongo/db/s/migration_coordinator.h"
#include "mongo/db/s/migration_util.h"
@@ -106,6 +107,28 @@ void refreshRecipientRoutingTable(OperationContext* opCtx,
executor->scheduleRemoteCommand(request, noOp).getStatus().ignore();
}
+
+/*
+ * Taking into account the provided max chunk size, returns:
+ * - A `max` bound to perform split+move in case the chunk owning `min` is splittable.
+ * - The `max` bound of the chunk owning `min in case it can't be split (too small or jumbo).
+ */
+BSONObj computeMaxBound(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& min,
+ const Chunk& owningChunk,
+ const ShardKeyPattern& skPattern,
+ const long long maxChunkSizeBytes) {
+ // TODO SERVER-64926 do not assume min always present
+ auto [splitKeys, _] = autoSplitVector(
+ opCtx, nss, skPattern.toBSON(), min, owningChunk.getMax(), maxChunkSizeBytes, 1);
+ if (splitKeys.size()) {
+ return std::move(splitKeys.front());
+ }
+
+ return owningChunk.getMax();
+}
+
MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep1);
MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep2);
MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep3);
@@ -135,44 +158,47 @@ std::shared_ptr<MigrationChunkClonerSource> MigrationSourceManager::getCurrentCl
}
MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
- MoveChunkRequest request,
+ ShardsvrMoveRange&& request,
+ WriteConcernOptions&& writeConcern,
ConnectionString donorConnStr,
HostAndPort recipientHost)
: _opCtx(opCtx),
- _args(std::move(request)),
+ _args(request),
+ _writeConcern(writeConcern),
_donorConnStr(std::move(donorConnStr)),
_recipientHost(std::move(recipientHost)),
_stats(ShardingStatistics::get(_opCtx)),
_critSecReason(BSON("command"
<< "moveChunk"
- << "fromShard" << _args.getFromShardId() << "toShard"
- << _args.getToShardId())),
+ << "fromShard" << _args.getFromShard() << "toShard"
+ << _args.getToShard())),
_acquireCSOnRecipient(feature_flags::gFeatureFlagMigrationRecipientCriticalSection.isEnabled(
serverGlobalParams.featureCompatibility)),
_moveTimingHelper(_opCtx,
"from",
- _args.getNss().ns(),
- _args.getMinKey(),
- _args.getMaxKey(),
+ _args.getCommandParameter().ns(),
+ _args.getMin(),
+ _args.getMax(),
6, // Total number of steps
&kEmptyErrMsgForMoveTimingHelper,
- _args.getToShardId(),
- _args.getFromShardId()) {
+ _args.getToShard(),
+ _args.getFromShard()) {
invariant(!_opCtx->lockState()->isLocked());
LOGV2(22016,
"Starting chunk migration donation {requestParameters} with expected collection epoch "
"{collectionEpoch}",
"Starting chunk migration donation",
- "requestParameters"_attr = redact(_args.toString()),
- "collectionEpoch"_attr = _args.getVersionEpoch());
+ "requestParameters"_attr = redact(_args.toBSON({})),
+ "collectionEpoch"_attr = _args.getEpoch());
_moveTimingHelper.done(1);
moveChunkHangAtStep1.pauseWhileSet();
// Make sure the latest shard version is recovered as of the time of the invocation of the
// command.
- onShardVersionMismatch(_opCtx, _args.getNss(), boost::none);
+ onShardVersionMismatch(_opCtx, nss(), boost::none);
+
const auto shardId = ShardingState::get(opCtx)->shardId();
// Complete any unfinished migration pending recovery
@@ -190,18 +216,18 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
// Snapshot the committed metadata from the time the migration starts
const auto [collectionMetadata, collectionUUID] = [&] {
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
- AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IS);
+ AutoGetCollection autoColl(_opCtx, nss(), MODE_IS);
uassert(ErrorCodes::InvalidOptions,
"cannot move chunks for a collection that doesn't exist",
autoColl.getCollection());
UUID collectionUUID = autoColl.getCollection()->uuid();
- auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss());
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, nss());
const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
auto optMetadata = csr->getCurrentMetadataIfKnown();
- uassert(StaleConfigInfo(_args.getNss(),
+ uassert(StaleConfigInfo(nss(),
ChunkVersion::IGNORED() /* receivedVersion */,
boost::none /* wantedVersion */,
shardId,
@@ -210,7 +236,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
optMetadata);
auto& metadata = *optMetadata;
- uassert(StaleConfigInfo(_args.getNss(),
+ uassert(StaleConfigInfo(nss(),
ChunkVersion::IGNORED() /* receivedVersion */,
ChunkVersion::UNSHARDED() /* wantedVersion */,
shardId,
@@ -234,36 +260,52 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
const auto collectionVersion = collectionMetadata.getCollVersion();
const auto shardVersion = collectionMetadata.getShardVersion();
- uassert(StaleConfigInfo(_args.getNss(),
+ uassert(StaleConfigInfo(nss(),
ChunkVersion::IGNORED() /* receivedVersion */,
shardVersion /* wantedVersion */,
shardId,
boost::none),
- str::stream() << "cannot move chunk " << _args.toString()
+ str::stream() << "cannot move chunk " << _args.toBSON({})
<< " because collection may have been dropped. "
<< "current epoch: " << collectionVersion.epoch()
- << ", cmd epoch: " << _args.getVersionEpoch(),
- _args.getVersionEpoch() == collectionVersion.epoch());
+ << ", cmd epoch: " << _args.getEpoch(),
+ _args.getEpoch() == collectionVersion.epoch());
- uassert(StaleConfigInfo(_args.getNss(),
+ uassert(StaleConfigInfo(nss(),
ChunkVersion::IGNORED() /* receivedVersion */,
shardVersion /* wantedVersion */,
shardId,
boost::none),
- str::stream() << "cannot move chunk " << _args.toString()
+ str::stream() << "cannot move chunk " << _args.toBSON({})
<< " because the shard doesn't contain any chunks",
shardVersion.majorVersion() > 0);
+ // Compute the max bound in case only `min` is set (moveRange)
+ if (!_args.getMax().is_initialized()) {
+ // TODO SERVER-64926 do not assume min always present
+ const auto& min = *_args.getMin();
+
+ const auto cm = collectionMetadata.getChunkManager();
+ const auto owningChunk = cm->findIntersectingChunkWithSimpleCollation(min);
+ const auto max = computeMaxBound(_opCtx,
+ nss(),
+ min,
+ owningChunk,
+ cm->getShardKeyPattern(),
+ _args.getMaxChunkSizeBytes());
+ _args.getMoveRangeRequestBase().setMax(max);
+ _moveTimingHelper.setMax(max);
+ }
+
const auto& keyPattern = collectionMetadata.getKeyPattern();
const bool validBounds = [&]() {
// Return true if provided bounds are respecting the shard key format, false otherwise
const auto nFields = keyPattern.nFields();
- if (nFields != _args.getMinKey().nFields() || nFields != _args.getMaxKey().nFields()) {
+ if (nFields != (*_args.getMin()).nFields() || nFields != (*_args.getMax()).nFields()) {
return false;
}
- BSONObjIterator keyPatternIt(keyPattern), minIt(_args.getMinKey()),
- maxIt(_args.getMaxKey());
+ BSONObjIterator keyPatternIt(keyPattern), minIt(*_args.getMin()), maxIt(*_args.getMax());
while (keyPatternIt.more()) {
const auto keyPatternField = keyPatternIt.next().fieldNameStringData();
@@ -277,43 +319,43 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
return true;
}();
- uassert(StaleConfigInfo(_args.getNss(),
+ uassert(StaleConfigInfo(nss(),
ChunkVersion::IGNORED() /* receivedVersion */,
shardVersion /* wantedVersion */,
shardId,
boost::none),
str::stream() << "Range bounds do not match the shard key pattern. KeyPattern: "
<< keyPattern.toString() << " - Bounds: "
- << ChunkRange(_args.getMinKey(), _args.getMaxKey()).toString() << ".",
+ << ChunkRange(*_args.getMin(), *_args.getMax()).toString() << ".",
validBounds);
ChunkType existingChunk;
- uassert(StaleConfigInfo(_args.getNss(),
+ uassert(StaleConfigInfo(nss(),
ChunkVersion::IGNORED() /* receivedVersion */,
shardVersion /* wantedVersion */,
shardId,
boost::none),
str::stream() << "Range with bounds "
- << ChunkRange(_args.getMinKey(), _args.getMaxKey()).toString()
+ << ChunkRange(*_args.getMin(), *_args.getMax()).toString()
<< " is not owned by this shard.",
- collectionMetadata.getNextChunk(_args.getMinKey(), &existingChunk));
+ collectionMetadata.getNextChunk(*_args.getMin(), &existingChunk));
- uassert(StaleConfigInfo(_args.getNss(),
+ uassert(StaleConfigInfo(nss(),
ChunkVersion::IGNORED() /* receivedVersion */,
shardVersion /* wantedVersion */,
shardId,
boost::none),
str::stream() << "Unable to move range with bounds "
- << ChunkRange(_args.getMinKey(), _args.getMaxKey()).toString()
+ << ChunkRange(*_args.getMin(), *_args.getMax()).toString()
<< " . The closest owned chunk is "
<< ChunkRange(existingChunk.getMin(), existingChunk.getMax()).toString(),
- existingChunk.getRange().covers(ChunkRange(_args.getMinKey(), _args.getMaxKey())));
+ existingChunk.getRange().covers(ChunkRange(*_args.getMin(), *_args.getMax())));
_collectionEpoch = collectionVersion.epoch();
_collectionUUID = collectionUUID;
_chunkVersion = collectionMetadata.getChunkManager()
- ->findIntersectingChunkWithSimpleCollation(_args.getMinKey())
+ ->findIntersectingChunkWithSimpleCollation(*_args.getMin())
.getLastmod();
_moveTimingHelper.done(2);
@@ -336,9 +378,9 @@ void MigrationSourceManager::startClone() {
uassertStatusOK(ShardingLogging::get(_opCtx)->logChangeChecked(
_opCtx,
"moveChunk.start",
- _args.getNss().ns(),
- BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
- << _args.getFromShardId() << "to" << _args.getToShardId()),
+ nss().ns(),
+ BSON("min" << *_args.getMin() << "max" << *_args.getMax() << "from" << _args.getFromShard()
+ << "to" << _args.getToShard()),
ShardingCatalogClient::kMajorityWriteConcern));
_cloneAndCommitTimer.reset();
@@ -350,13 +392,13 @@ void MigrationSourceManager::startClone() {
const auto metadata = _getCurrentMetadataAndCheckEpoch();
AutoGetCollection autoColl(_opCtx,
- _args.getNss(),
+ nss(),
replEnabled ? MODE_IX : MODE_X,
AutoGetCollectionViewMode::kViewsForbidden,
_opCtx->getServiceContext()->getPreciseClockSource()->now() +
Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));
- auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss());
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, nss());
const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
// Having the metadata manager registered on the collection sharding state is what indicates
@@ -364,14 +406,14 @@ void MigrationSourceManager::startClone() {
// migration, write operations require the cloner to be present in order to track changes to
// the chunk which needs to be transmitted to the recipient.
_cloneDriver = std::make_shared<MigrationChunkClonerSourceLegacy>(
- _args, metadata.getKeyPattern(), _donorConnStr, _recipientHost);
+ _args, _writeConcern, metadata.getKeyPattern(), _donorConnStr, _recipientHost);
_coordinator.emplace(_cloneDriver->getSessionId(),
- _args.getFromShardId(),
- _args.getToShardId(),
- _args.getNss(),
+ _args.getFromShard(),
+ _args.getToShard(),
+ nss(),
*_collectionUUID,
- ChunkRange(_args.getMinKey(), _args.getMaxKey()),
+ ChunkRange(*_args.getMin(), *_args.getMax()),
*_chunkVersion,
_args.getWaitForDelete());
@@ -427,9 +469,9 @@ void MigrationSourceManager::enterCriticalSection() {
// Check that there are no chunks on the recepient shard. Write an oplog event for change
// streams if this is the first migration to the recipient.
- if (!metadata.getChunkManager()->getVersion(_args.getToShardId()).isSet()) {
+ if (!metadata.getChunkManager()->getVersion(_args.getToShard()).isSet()) {
migrationutil::notifyChangeStreamsOnRecipientFirstChunk(
- _opCtx, _args.getNss(), _args.getFromShardId(), _args.getToShardId(), _collectionUUID);
+ _opCtx, nss(), _args.getFromShard(), _args.getToShard(), _collectionUUID);
}
// Mark the shard as running critical operation, which requires recovery on crash.
@@ -445,7 +487,7 @@ void MigrationSourceManager::enterCriticalSection() {
"Starting critical section",
"migrationId"_attr = _coordinator->getMigrationId());
- _critSec.emplace(_opCtx, _args.getNss(), _critSecReason);
+ _critSec.emplace(_opCtx, nss(), _critSecReason);
_state = kCriticalSection;
@@ -457,7 +499,7 @@ void MigrationSourceManager::enterCriticalSection() {
// will stall behind the flag.
Status signalStatus = shardmetadatautil::updateShardCollectionsEntry(
_opCtx,
- BSON(ShardCollectionType::kNssFieldName << _args.getNss().ns()),
+ BSON(ShardCollectionType::kNssFieldName << nss().ns()),
BSON("$inc" << BSON(ShardCollectionType::kEnterCriticalSectionCounterFieldName << 1)),
false /*upsert*/);
if (!signalStatus.isOK()) {
@@ -479,7 +521,8 @@ void MigrationSourceManager::commitChunkOnRecipient() {
invariant(_state == kCriticalSection);
ScopeGuard scopedGuard([&] {
_cleanupOnError();
- migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, _args.getNss());
+ migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx,
+ _args.getCommandParameter());
});
// Tell the recipient shard to fetch the latest changes.
@@ -502,9 +545,10 @@ void MigrationSourceManager::commitChunkOnRecipient() {
void MigrationSourceManager::commitChunkMetadataOnConfig() {
invariant(!_opCtx->lockState()->isLocked());
invariant(_state == kCloneCompleted);
+
ScopeGuard scopedGuard([&] {
_cleanupOnError();
- migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, _args.getNss());
+ migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, nss());
});
// If we have chunks left on the FROM shard, bump the version of one of them as well. This will
@@ -516,15 +560,15 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
const auto metadata = _getCurrentMetadataAndCheckEpoch();
ChunkType migratedChunkType;
- migratedChunkType.setMin(_args.getMinKey());
- migratedChunkType.setMax(_args.getMaxKey());
+ migratedChunkType.setMin(*_args.getMin());
+ migratedChunkType.setMax(*_args.getMax());
migratedChunkType.setVersion(*_chunkVersion);
const auto currentTime = VectorClock::get(_opCtx)->getTime();
CommitChunkMigrationRequest::appendAsCommand(&builder,
- _args.getNss(),
- _args.getFromShardId(),
- _args.getToShardId(),
+ nss(),
+ _args.getFromShard(),
+ _args.getToShard(),
migratedChunkType,
metadata.getCollVersion(),
currentTime.clusterTime().asTimestamp());
@@ -559,12 +603,12 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
if (!migrationCommitStatus.isOK()) {
{
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
- AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX);
- CollectionShardingRuntime::get(_opCtx, _args.getNss())->clearFilteringMetadata(_opCtx);
+ AutoGetCollection autoColl(_opCtx, nss(), MODE_IX);
+ CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx);
}
scopedGuard.dismiss();
_cleanup(false);
- migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, _args.getNss());
+ migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, nss());
uassertStatusOK(migrationCommitStatus);
}
@@ -582,7 +626,7 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
"Starting post-migration commit refresh on the shard",
"migrationId"_attr = _coordinator->getMigrationId());
- forceShardFilteringMetadataRefresh(_opCtx, _args.getNss());
+ forceShardFilteringMetadataRefresh(_opCtx, nss());
LOGV2_DEBUG_OPTIONS(4817405,
2,
@@ -598,13 +642,13 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
"error"_attr = redact(ex));
{
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
- AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX);
- CollectionShardingRuntime::get(_opCtx, _args.getNss())->clearFilteringMetadata(_opCtx);
+ AutoGetCollection autoColl(_opCtx, nss(), MODE_IX);
+ CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx);
}
scopedGuard.dismiss();
_cleanup(false);
// Best-effort recover of the shard version.
- onShardVersionMismatchNoExcept(_opCtx, _args.getNss(), boost::none).ignore();
+ onShardVersionMismatchNoExcept(_opCtx, nss(), boost::none).ignore();
throw;
}
@@ -613,9 +657,9 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch();
// Check if there are no chunks left on donor shard. Write an oplog event for change streams if
// the last chunk migrated off the donor.
- if (!refreshedMetadata.getChunkManager()->getVersion(_args.getFromShardId()).isSet()) {
+ if (!refreshedMetadata.getChunkManager()->getVersion(_args.getFromShard()).isSet()) {
migrationutil::notifyChangeStreamsOnDonorLastChunk(
- _opCtx, _args.getNss(), _args.getFromShardId(), _collectionUUID);
+ _opCtx, nss(), _args.getFromShard(), _collectionUUID);
}
@@ -627,9 +671,9 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
// If the migration has succeeded, clear the BucketCatalog so that the buckets that got migrated
// out are no longer updatable.
- if (_args.getNss().isTimeseriesBucketsCollection()) {
+ if (nss().isTimeseriesBucketsCollection()) {
auto& bucketCatalog = BucketCatalog::get(_opCtx);
- bucketCatalog.clear(_args.getNss().getTimeseriesViewNamespace());
+ bucketCatalog.clear(nss().getTimeseriesViewNamespace());
}
_coordinator->setMigrationDecision(DecisionEnum::kCommitted);
@@ -652,23 +696,22 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
ShardingLogging::get(_opCtx)->logChange(
_opCtx,
"moveChunk.commit",
- _args.getNss().ns(),
- BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
- << _args.getFromShardId() << "to" << _args.getToShardId() << "counts"
- << *_recipientCloneCounts),
+ nss().ns(),
+ BSON("min" << *_args.getMin() << "max" << *_args.getMax() << "from" << _args.getFromShard()
+ << "to" << _args.getToShard() << "counts" << *_recipientCloneCounts),
ShardingCatalogClient::kMajorityWriteConcern);
- const ChunkRange range(_args.getMinKey(), _args.getMaxKey());
+ const ChunkRange range(*_args.getMin(), *_args.getMax());
if (!_acquireCSOnRecipient && !MONGO_unlikely(doNotRefreshRecipientAfterCommit.shouldFail())) {
// Best-effort make the recipient refresh its routing table to the new collection
// version.
refreshRecipientRoutingTable(
- _opCtx, _args.getNss(), _recipientHost, refreshedMetadata.getCollVersion());
+ _opCtx, nss(), _recipientHost, refreshedMetadata.getCollVersion());
}
std::string orphanedRangeCleanUpErrMsg = str::stream()
- << "Moved chunks successfully but failed to clean up " << _args.getNss() << " range "
+ << "Moved chunks successfully but failed to clean up " << nss() << " range "
<< redact(range.toString()) << " due to: ";
if (_args.getWaitForDelete()) {
@@ -676,7 +719,7 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() {
"Waiting for migration cleanup after chunk commit for the namespace {namespace} "
"and range {range}",
"Waiting for migration cleanup after chunk commit",
- "namespace"_attr = _args.getNss(),
+ "namespace"_attr = nss(),
"range"_attr = redact(range.toString()),
"migrationId"_attr = _coordinator->getMigrationId());
@@ -703,9 +746,9 @@ void MigrationSourceManager::_cleanupOnError() noexcept {
ShardingLogging::get(_opCtx)->logChange(
_opCtx,
"moveChunk.error",
- _args.getNss().ns(),
- BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
- << _args.getFromShardId() << "to" << _args.getToShardId()),
+ _args.getCommandParameter().ns(),
+ BSON("min" << *_args.getMin() << "max" << *_args.getMax() << "from" << _args.getFromShard()
+ << "to" << _args.getToShard()),
ShardingCatalogClient::kMajorityWriteConcern);
_cleanup(true);
@@ -722,8 +765,8 @@ SharedSemiFuture<void> MigrationSourceManager::abort() {
CollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() {
auto metadata = [&] {
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
- AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IS);
- auto* const css = CollectionShardingRuntime::get(_opCtx, _args.getNss());
+ AutoGetCollection autoColl(_opCtx, _args.getCommandParameter(), MODE_IS);
+ auto* const css = CollectionShardingRuntime::get(_opCtx, _args.getCommandParameter());
const auto optMetadata = css->getCurrentMetadataIfKnown();
uassert(ErrorCodes::ConflictingOperationInProgress,
@@ -749,8 +792,8 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept {
auto cloneDriver = [&]() {
// Unregister from the collection's sharding state and exit the migration critical section.
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
- AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX);
- auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss());
+ AutoGetCollection autoColl(_opCtx, nss(), MODE_IX);
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, nss());
const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
if (_state != kCreated) {
@@ -806,7 +849,7 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept {
// is possible that the persisted metadata is rolled back after step down, but the
// write which cleared the 'inMigration' flag is not, a secondary node will report
// itself at an older shard version.
- CatalogCacheLoader::get(newOpCtx).waitForCollectionFlush(newOpCtx, _args.getNss());
+ CatalogCacheLoader::get(newOpCtx).waitForCollectionFlush(newOpCtx, nss());
// Clear the 'minOpTime recovery' document so that the next time a node from this
// shard becomes a primary, it won't have to recover the config server optime.
@@ -827,24 +870,24 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept {
"Failed to complete the migration {migrationId} with "
"{chunkMigrationRequestParameters} due to: {error}",
"Failed to complete the migration",
- "chunkMigrationRequestParameters"_attr = redact(_args.toString()),
+ "chunkMigrationRequestParameters"_attr = redact(_args.toBSON({})),
"error"_attr = redact(ex),
"migrationId"_attr = _coordinator->getMigrationId());
// Something went really wrong when completing the migration just unset the metadata and let
// the next op to recover.
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
- AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX);
- CollectionShardingRuntime::get(_opCtx, _args.getNss())->clearFilteringMetadata(_opCtx);
+ AutoGetCollection autoColl(_opCtx, nss(), MODE_IX);
+ CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx);
}
}
BSONObj MigrationSourceManager::getMigrationStatusReport() const {
- return migrationutil::makeMigrationStatusDocument(_args.getNss(),
- _args.getFromShardId(),
- _args.getToShardId(),
+ return migrationutil::makeMigrationStatusDocument(_args.getCommandParameter(),
+ _args.getFromShard(),
+ _args.getToShard(),
true,
- _args.getMinKey(),
- _args.getMaxKey());
+ *_args.getMin(),
+ *_args.getMax());
}
MigrationSourceManager::ScopedRegisterer::ScopedRegisterer(
@@ -857,8 +900,8 @@ MigrationSourceManager::ScopedRegisterer::ScopedRegisterer(
MigrationSourceManager::ScopedRegisterer::~ScopedRegisterer() {
UninterruptibleLockGuard noInterrupt(_msm->_opCtx->lockState());
- AutoGetCollection autoColl(_msm->_opCtx, _msm->_args.getNss(), MODE_IX);
- auto csr = CollectionShardingRuntime::get(_msm->_opCtx, _msm->_args.getNss());
+ AutoGetCollection autoColl(_msm->_opCtx, _msm->_args.getCommandParameter(), MODE_IX);
+ auto csr = CollectionShardingRuntime::get(_msm->_opCtx, _msm->_args.getCommandParameter());
auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_msm->_opCtx, csr);
invariant(_msm == std::exchange(msmForCsr(csr), nullptr));
}
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 9b1b64dd87f..d93c701f3d5 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -35,7 +35,7 @@
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_coordinator.h"
#include "mongo/db/s/move_timing_helper.h"
-#include "mongo/s/request_types/move_chunk_request.h"
+#include "mongo/s/request_types/move_range_request_gen.h"
#include "mongo/util/timer.h"
namespace mongo {
@@ -101,7 +101,8 @@ public:
* to be after acquiring the distributed lock.
*/
MigrationSourceManager(OperationContext* opCtx,
- MoveChunkRequest request,
+ ShardsvrMoveRange&& request,
+ WriteConcernOptions&& writeConcern,
ConnectionString donorConnStr,
HostAndPort recipientHost);
~MigrationSourceManager();
@@ -171,6 +172,10 @@ public:
*/
BSONObj getMigrationStatusReport() const;
+ const NamespaceString& nss() {
+ return _args.getCommandParameter();
+ }
+
private:
// Used to track the current state of the source manager. See the methods above, which have
// comments explaining the various state transitions.
@@ -206,8 +211,11 @@ private:
// The caller must guarantee it outlives the MigrationSourceManager.
OperationContext* const _opCtx;
- // The parameters to the moveChunk command
- const MoveChunkRequest _args;
+ // The parameters to the moveRange command
+ ShardsvrMoveRange _args;
+
+ // The write concern received for the moveRange command
+ const WriteConcernOptions _writeConcern;
// The resolved connection string of the donor shard
const ConnectionString _donorConnStr;
diff --git a/src/mongo/db/s/move_timing_helper.cpp b/src/mongo/db/s/move_timing_helper.cpp
index f3270f67986..f62adc83956 100644
--- a/src/mongo/db/s/move_timing_helper.cpp
+++ b/src/mongo/db/s/move_timing_helper.cpp
@@ -44,8 +44,8 @@ namespace mongo {
MoveTimingHelper::MoveTimingHelper(OperationContext* opCtx,
const std::string& where,
const std::string& ns,
- const BSONObj& min,
- const BSONObj& max,
+ const boost::optional<BSONObj>& min,
+ const boost::optional<BSONObj>& max,
int totalNumSteps,
std::string* cmdErrmsg,
const ShardId& toShard,
@@ -55,17 +55,19 @@ MoveTimingHelper::MoveTimingHelper(OperationContext* opCtx,
_ns(ns),
_to(toShard),
_from(fromShard),
+ _min(min),
+ _max(max),
_totalNumSteps(totalNumSteps),
_cmdErrmsg(cmdErrmsg),
- _nextStep(0) {
- _b.append("min", min);
- _b.append("max", max);
-}
+ _nextStep(0) {}
MoveTimingHelper::~MoveTimingHelper() {
// even if logChange doesn't throw, bson does
// sigh
try {
+ _b.append("min", _min.get_value_or(BSONObj()));
+ _b.append("max", _max.get_value_or(BSONObj()));
+
if (_to.isValid()) {
_b.append("to", _to.toString());
}
diff --git a/src/mongo/db/s/move_timing_helper.h b/src/mongo/db/s/move_timing_helper.h
index 9d7f75ee90e..a90dd465090 100644
--- a/src/mongo/db/s/move_timing_helper.h
+++ b/src/mongo/db/s/move_timing_helper.h
@@ -45,14 +45,22 @@ public:
MoveTimingHelper(OperationContext* opCtx,
const std::string& where,
const std::string& ns,
- const BSONObj& min,
- const BSONObj& max,
+ const boost::optional<BSONObj>& min,
+ const boost::optional<BSONObj>& max,
int totalNumSteps,
std::string* cmdErrmsg,
const ShardId& toShard,
const ShardId& fromShard);
~MoveTimingHelper();
+ void setMin(const BSONObj& min) {
+ _min.emplace(min);
+ }
+
+ void setMax(const BSONObj& max) {
+ _max.emplace(max);
+ }
+
void done(int step);
private:
@@ -64,6 +72,8 @@ private:
const std::string _ns;
const ShardId _to;
const ShardId _from;
+
+ boost::optional<BSONObj> _min, _max;
const int _totalNumSteps;
const std::string* _cmdErrmsg;
diff --git a/src/mongo/db/s/shardsvr_move_range_command.cpp b/src/mongo/db/s/shardsvr_move_range_command.cpp
index ad92264d0cd..0eb40a7fd31 100644
--- a/src/mongo/db/s/shardsvr_move_range_command.cpp
+++ b/src/mongo/db/s/shardsvr_move_range_command.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/commands.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/active_migrations_registry.h"
-#include "mongo/db/s/auto_split_vector.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_statistics.h"
@@ -86,31 +85,6 @@ public:
void typedRun(OperationContext* opCtx) {
uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
opCtx->setAlwaysInterruptAtStepDownOrUp();
- const auto& originalReq = request();
-
- const auto WC = opCtx->getWriteConcern();
- BSONObjBuilder moveChunkReqBuilder(originalReq.toBSON({}));
- moveChunkReqBuilder.append(WriteConcernOptions::kWriteConcernField, WC.toBSON());
-
- // TODO SERVER-64926 do not assume min always present
- const auto& min = *request().getMin();
-
- // TODO SERVER-64817 compute missing bound of `moveRange` within MigrationSourceManager
- if (!originalReq.getMax().is_initialized()) {
- // Compute the max bound in case only `min` is set (moveRange)
- const auto cm = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, ns()));
- uassert(ErrorCodes::NamespaceNotSharded,
- "Could not move chunk. Collection is no longer sharded",
- cm.isSharded());
-
- const auto owningChunk = cm.findIntersectingChunkWithSimpleCollation(min);
- const auto max = computeMaxBound(opCtx, owningChunk, cm.getShardKeyPattern());
- moveChunkReqBuilder.append(MoveRangeRequest::kMaxFieldName, max);
- }
-
- MoveChunkRequest moveChunkRequest = uassertStatusOK(
- MoveChunkRequest::createFromCommand(ns(), moveChunkReqBuilder.obj()));
// Make sure we're as up-to-date as possible with shard information. This catches the
// case where we might have changed a shard's host by removing/adding a shard with the
@@ -118,13 +92,14 @@ public:
Grid::get(opCtx)->shardRegistry()->reload(opCtx);
auto scopedMigration = uassertStatusOK(
- ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(opCtx, moveChunkRequest));
+ ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(opCtx, request()));
// Check if there is an existing migration running and if so, join it
if (scopedMigration.mustExecute()) {
auto moveChunkComplete =
ExecutorFuture<void>(_getExecutor())
- .then([moveChunkRequest,
+ .then([req = request(),
+ writeConcern = opCtx->getWriteConcern(),
scopedMigration = std::move(scopedMigration),
serviceContext = opCtx->getServiceContext()]() mutable {
// This local variable is created to enforce that the scopedMigration is
@@ -157,7 +132,7 @@ public:
Status status = {ErrorCodes::InternalError, "Uninitialized value"};
try {
- _runImpl(opCtx, moveChunkRequest);
+ _runImpl(opCtx, std::move(req), std::move(writeConcern));
status = Status::OK();
} catch (const DBException& e) {
status = e.toStatus();
@@ -180,7 +155,7 @@ public:
uassertStatusOK(scopedMigration.waitForCompletion(opCtx));
}
- if (moveChunkRequest.getWaitForDelete()) {
+ if (request().getWaitForDelete()) {
// Ensure we capture the latest opTime in the system, since range deletion happens
// asynchronously with a different OperationContext. This must be done after the
// above join, because each caller must set the opTime to wait for writeConcern for
@@ -214,35 +189,10 @@ public:
ActionType::internal));
}
- /*
- * Compute the max bound in case only `min` is set (moveRange).
- *
- * Taking into account the provided max chunk size, returns:
- * - A `max` bound to perform split+move in case the chunk owning `min` is splittable.
- * - The `max` bound of the chunk owning `min in case it can't be split (too small or
- * jumbo).
- */
- BSONObj computeMaxBound(OperationContext* opCtx,
- const Chunk& owningChunk,
- const ShardKeyPattern& skPattern) {
- // TODO SERVER-64926 do not assume min always present
- const auto& min = *request().getMin();
- auto [splitKeys, _] = autoSplitVector(opCtx,
- ns(),
- skPattern.toBSON(),
- min,
- owningChunk.getMax(),
- *request().getMaxChunkSizeBytes(),
- 1);
- if (splitKeys.size()) {
- return std::move(splitKeys.front());
- }
-
- return owningChunk.getMax();
- }
-
- static void _runImpl(OperationContext* opCtx, const MoveChunkRequest& moveChunkRequest) {
- if (moveChunkRequest.getFromShardId() == moveChunkRequest.getToShardId()) {
+ static void _runImpl(OperationContext* opCtx,
+ ShardsvrMoveRange&& request,
+ WriteConcernOptions&& writeConcern) {
+ if (request.getFromShard() == request.getToShard()) {
// TODO: SERVER-46669 handle wait for delete.
return;
}
@@ -251,18 +201,18 @@ public:
auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
const auto donorConnStr =
- uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getFromShardId()))
+ uassertStatusOK(shardRegistry->getShard(opCtx, request.getFromShard()))
->getConnString();
const auto recipientHost = uassertStatusOK([&] {
- auto recipientShard = uassertStatusOK(
- shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId()));
+ auto recipientShard =
+ uassertStatusOK(shardRegistry->getShard(opCtx, request.getToShard()));
return recipientShard->getTargeter()->findHost(
opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
}());
MigrationSourceManager migrationSourceManager(
- opCtx, moveChunkRequest, donorConnStr, recipientHost);
+ opCtx, std::move(request), std::move(writeConcern), donorConnStr, recipientHost);
migrationSourceManager.startClone();
migrationSourceManager.awaitToCatchUp();