diff options
author | Pierlauro Sciarelli <pierlauro.sciarelli@mongodb.com> | 2022-04-25 13:19:16 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-25 14:08:47 +0000 |
commit | 36d3af6e81247713f53065769e73f4f0fa8e622d (patch) | |
tree | 773b9067805b081570745bdae549609c999e1ada /src/mongo/db | |
parent | 62e65260473152d8655d4a910505e6404067b907 (diff) | |
download | mongo-36d3af6e81247713f53065769e73f4f0fa8e622d.tar.gz |
SERVER-64817 Compute missing bound of moveRange within MigrationSourceManager
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.cpp | 78 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/active_migrations_registry_test.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 106 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 26 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp | 108 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 227 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 16 | ||||
-rw-r--r-- | src/mongo/db/s/move_timing_helper.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/s/move_timing_helper.h | 14 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_move_range_command.cpp | 76 |
13 files changed, 387 insertions, 347 deletions
diff --git a/src/mongo/db/s/active_migrations_registry.cpp b/src/mongo/db/s/active_migrations_registry.cpp index 3ed54b597f6..baaa763daaa 100644 --- a/src/mongo/db/s/active_migrations_registry.cpp +++ b/src/mongo/db/s/active_migrations_registry.cpp @@ -93,11 +93,12 @@ void ActiveMigrationsRegistry::unlock(StringData reason) { } StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( - OperationContext* opCtx, const MoveChunkRequest& args) { + OperationContext* opCtx, const ShardsvrMoveRange& args) { stdx::unique_lock<Latch> ul(_mutex); opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { - return !_migrationsBlocked && !_activeSplitMergeChunkStates.count(args.getNss()); + return !_migrationsBlocked && + !_activeSplitMergeChunkStates.count(args.getCommandParameter()); }); if (_activeReceiveChunkState) { @@ -105,24 +106,23 @@ StatusWith<ScopedDonateChunk> ActiveMigrationsRegistry::registerDonateChunk( } if (_activeMoveChunkState) { - if (_activeMoveChunkState->args == args) { - LOGV2(5004704, - "registerDonateChunk ", - "keys"_attr = ChunkRange(args.getMinKey(), args.getMaxKey()).toString(), - "toShardId"_attr = args.getToShardId(), - logAttrs(args.getNss())); + auto activeMoveChunkStateBSON = _activeMoveChunkState->args.toBSON({}); + + if (activeMoveChunkStateBSON.woCompare(args.toBSON({})) == 0) { + LOGV2(6386800, + "Registering new chunk donation", + logAttrs(args.getCommandParameter()), + "min"_attr = args.getMin(), + "max"_attr = args.getMax(), + "toShardId"_attr = args.getToShard()); return {ScopedDonateChunk(nullptr, false, _activeMoveChunkState->notification)}; } - LOGV2(5004700, - "registerDonateChunk", - "currentKeys"_attr = ChunkRange(_activeMoveChunkState->args.getMinKey(), - _activeMoveChunkState->args.getMaxKey()) - .toString(), - "currentToShardId"_attr = _activeMoveChunkState->args.getToShardId(), - "newKeys"_attr = ChunkRange(args.getMinKey(), args.getMaxKey()).toString(), - "newToShardId"_attr = args.getToShardId(), - logAttrs(args.getNss())); + LOGV2(6386801, + "Rejecting donate chunk due to conflicting migration in progress", + logAttrs(args.getCommandParameter()), + "runningMigration"_attr = activeMoveChunkStateBSON, + "requestedMigration"_attr = args.toBSON({})); return _activeMoveChunkState->constructErrorStatus(); } @@ -147,13 +147,10 @@ StatusWith<ScopedReceiveChunk> ActiveMigrationsRegistry::registerReceiveChunk( } if (_activeMoveChunkState) { - LOGV2(5004701, - "registerReceiveChunk ", - "currentKeys"_attr = ChunkRange(_activeMoveChunkState->args.getMinKey(), - _activeMoveChunkState->args.getMaxKey()) - .toString(), - "currentToShardId"_attr = _activeMoveChunkState->args.getToShardId(), - logAttrs(_activeMoveChunkState->args.getNss())); + LOGV2(6386802, + "Rejecting receive chunk due to conflicting donate chunk in progress", + logAttrs(_activeMoveChunkState->args.getCommandParameter()), + "runningMigration"_attr = _activeMoveChunkState->args.toBSON({})); return _activeMoveChunkState->constructErrorStatus(); } @@ -167,7 +164,8 @@ StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMerge stdx::unique_lock<Latch> ul(_mutex); opCtx->waitForConditionOrInterrupt(_chunkOperationsStateChangedCV, ul, [&] { - return !(_activeMoveChunkState && _activeMoveChunkState->args.getNss() == nss) && + return !(_activeMoveChunkState && + _activeMoveChunkState->args.getCommandParameter() == nss) && !_activeSplitMergeChunkStates.count(nss); }); @@ -181,7 +179,7 @@ StatusWith<ScopedSplitMergeChunk> ActiveMigrationsRegistry::registerSplitOrMerge boost::optional<NamespaceString> ActiveMigrationsRegistry::getActiveDonateChunkNss() { stdx::lock_guard<Latch> lk(_mutex); if (_activeMoveChunkState) { - return _activeMoveChunkState->args.getNss(); + return _activeMoveChunkState->args.getCommandParameter(); } return boost::none; @@ -193,7 +191,7 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex stdx::lock_guard<Latch> lk(_mutex); if (_activeMoveChunkState) { - nss = _activeMoveChunkState->args.getNss(); + nss = _activeMoveChunkState->args.getCommandParameter(); } } @@ -218,12 +216,12 @@ BSONObj ActiveMigrationsRegistry::getActiveMigrationStatusReport(OperationContex void ActiveMigrationsRegistry::_clearDonateChunk() { stdx::lock_guard<Latch> lk(_mutex); invariant(_activeMoveChunkState); - LOGV2(5004702, - "clearDonateChunk ", - "currentKeys"_attr = ChunkRange(_activeMoveChunkState->args.getMinKey(), - _activeMoveChunkState->args.getMaxKey()) - .toString(), - "currentToShardId"_attr = _activeMoveChunkState->args.getToShardId()); + LOGV2(6386803, + "Unregistering donate chunk", + logAttrs(_activeMoveChunkState->args.getCommandParameter()), + "min"_attr = _activeMoveChunkState->args.getMin().get_value_or(BSONObj()), + "max"_attr = _activeMoveChunkState->args.getMax().get_value_or(BSONObj()), + "toShardId"_attr = _activeMoveChunkState->args.getToShard()); _activeMoveChunkState.reset(); _chunkOperationsStateChangedCV.notify_all(); } @@ -247,12 +245,14 @@ void ActiveMigrationsRegistry::_clearSplitMergeChunk(const NamespaceString& nss) } Status ActiveMigrationsRegistry::ActiveMoveChunkState::constructErrorStatus() const { - return {ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to start new balancer operation because this shard is " - "currently donating chunk " - << ChunkRange(args.getMinKey(), args.getMaxKey()).toString() - << " for namespace " << args.getNss().ns() << " to " - << args.getToShardId()}; + std::string errMsg = fmt::format( + "Unable to start new balancer operation because this shard is currently donating range " + "'{}{}' for namespace {} to shard {}", + (args.getMin() ? "min: " + args.getMin()->toString() + " - " : ""), + (args.getMax() ? "max: " + args.getMax()->toString() : ""), + args.getCommandParameter().ns(), + args.getToShard().toString()); + return {ErrorCodes::ConflictingOperationInProgress, std::move(errMsg)}; } Status ActiveMigrationsRegistry::ActiveReceiveChunkState::constructErrorStatus() const { diff --git a/src/mongo/db/s/active_migrations_registry.h b/src/mongo/db/s/active_migrations_registry.h index f6a99185d4a..f7616cc0f9f 100644 --- a/src/mongo/db/s/active_migrations_registry.h +++ b/src/mongo/db/s/active_migrations_registry.h @@ -33,7 +33,8 @@ #include "mongo/db/s/migration_session_id.h" #include "mongo/platform/mutex.h" -#include "mongo/s/request_types/move_chunk_request.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/request_types/move_range_request_gen.h" #include "mongo/util/concurrency/notification.h" namespace mongo { @@ -87,7 +88,7 @@ public: * Otherwise returns a ConflictingOperationInProgress error. */ StatusWith<ScopedDonateChunk> registerDonateChunk(OperationContext* opCtx, - const MoveChunkRequest& args); + const ShardsvrMoveRange& args); /** * If there are no migrations or split/merges running on this shard, registers an active receive @@ -132,7 +133,7 @@ private: // Describes the state of a currently active moveChunk operation struct ActiveMoveChunkState { - ActiveMoveChunkState(MoveChunkRequest inArgs) + ActiveMoveChunkState(ShardsvrMoveRange inArgs) : args(std::move(inArgs)), notification(std::make_shared<Notification<Status>>()) {} /** @@ -141,7 +142,7 @@ private: Status constructErrorStatus() const; // Exact arguments of the currently active operation - MoveChunkRequest args; + ShardsvrMoveRange args; // Notification event that will be signaled when the currently active operation completes std::shared_ptr<Notification<Status>> notification; diff --git a/src/mongo/db/s/active_migrations_registry_test.cpp b/src/mongo/db/s/active_migrations_registry_test.cpp index f637e3c4e1f..389728c8ace 100644 --- a/src/mongo/db/s/active_migrations_registry_test.cpp +++ b/src/mongo/db/s/active_migrations_registry_test.cpp @@ -58,27 +58,22 @@ protected: ServiceContext::UniqueOperationContext _opCtx; }; -MoveChunkRequest createMoveChunkRequest(const NamespaceString& nss) { - const ChunkVersion chunkVersion(1, 2, OID::gen(), Timestamp(1, 1)); - - BSONObjBuilder builder; - MoveChunkRequest::appendAsCommand( - &builder, - nss, - chunkVersion, - ShardId("shard0001"), - ShardId("shard0002"), - ChunkRange(BSON("Key" << -100), BSON("Key" << 100)), - 1024, - MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff), - true, - MoveChunkRequest::ForceJumbo::kDoNotForce); - return assertGet(MoveChunkRequest::createFromCommand(nss, builder.obj())); +ShardsvrMoveRange createMoveRangeRequest(const NamespaceString& nss, + const OID& epoch = OID::gen()) { + const ShardId fromShard = ShardId("shard0001"); + const long long maxChunkSizeBytes = 1024; + ShardsvrMoveRange req(nss, fromShard, maxChunkSizeBytes); + req.setEpoch(epoch); + req.getMoveRangeRequestBase().setToShard(ShardId("shard0002")); + req.setMaxChunkSizeBytes(1024); + req.getMoveRangeRequestBase().setMin(BSON("Key" << -100)); + req.getMoveRangeRequestBase().setMax(BSON("Key" << 100)); + return req; } TEST_F(MoveChunkRegistration, ScopedDonateChunkMoveConstructorAndAssignment) { auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk( - operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl")))); ASSERT(originalScopedDonateChunk.mustExecute()); ScopedDonateChunk movedScopedDonateChunk(std::move(originalScopedDonateChunk)); @@ -97,7 +92,7 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) { const NamespaceString nss("TestDB", "TestColl"); auto originalScopedDonateChunk = - assertGet(_registry.registerDonateChunk(operationContext(), createMoveChunkRequest(nss))); + assertGet(_registry.registerDonateChunk(operationContext(), createMoveRangeRequest(nss))); ASSERT_EQ(nss.ns(), _registry.getActiveDonateChunkNss()->ns()); @@ -107,10 +102,10 @@ TEST_F(MoveChunkRegistration, GetActiveMigrationNamespace) { TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgress) { auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk( - operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl1")))); + operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl1")))); auto secondScopedDonateChunkStatus = _registry.registerDonateChunk( - operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl2"))); + operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl2"))); ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress, secondScopedDonateChunkStatus.getStatus()); @@ -118,12 +113,14 @@ TEST_F(MoveChunkRegistration, SecondMigrationReturnsConflictingOperationInProgre } TEST_F(MoveChunkRegistration, SecondMigrationWithSameArgumentsJoinsFirst) { + const auto epoch = OID::gen(); + auto originalScopedDonateChunk = assertGet(_registry.registerDonateChunk( - operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl"), epoch))); ASSERT(originalScopedDonateChunk.mustExecute()); auto secondScopedDonateChunk = assertGet(_registry.registerDonateChunk( - operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl")))); + operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl"), epoch))); ASSERT(!secondScopedDonateChunk.mustExecute()); originalScopedDonateChunk.signalComplete({ErrorCodes::InternalError, "Test error"}); @@ -171,7 +168,7 @@ TEST_F(MoveChunkRegistration, TestBlockingDonateChunk) { // 6. Now that we're woken up by the registry thread, let's attempt to start to donate. // This will block and call the lambda set on the baton above. auto scopedDonateChunk = _registry.registerDonateChunk( - opCtx.get(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))); + opCtx.get(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl"))); ASSERT_OK(scopedDonateChunk.getStatus()); scopedDonateChunk.getValue().signalComplete(Status::OK()); @@ -262,7 +259,7 @@ TEST_F(MoveChunkRegistration, TestBlockingWhileDonateInProgress) { auto result = stdx::async(stdx::launch::async, [&] { // 2. Start a migration so that the registry lock will block when acquired. auto scopedDonateChunk = _registry.registerDonateChunk( - operationContext(), createMoveChunkRequest(NamespaceString("TestDB", "TestColl"))); + operationContext(), createMoveRangeRequest(NamespaceString("TestDB", "TestColl"))); ASSERT_OK(scopedDonateChunk.getStatus()); // 3. Signal the registry locking thread that the registry is ready to be locked. diff --git a/src/mongo/db/s/balancer/balancer.cpp b/src/mongo/db/s/balancer/balancer.cpp index 21b87bdba17..dc40cd77e5d 100644 --- a/src/mongo/db/s/balancer/balancer.cpp +++ b/src/mongo/db/s/balancer/balancer.cpp @@ -413,7 +413,7 @@ Status Balancer::moveRange(OperationContext* opCtx, ShardsvrMoveRange shardSvrRequest(nss); shardSvrRequest.setDbName(NamespaceString::kAdminDb); - shardSvrRequest.setMoveRangeRequest(request.getMoveRangeRequest()); + shardSvrRequest.setMoveRangeRequestBase(request.getMoveRangeRequestBase()); shardSvrRequest.setMaxChunkSizeBytes(maxChunkSize); shardSvrRequest.setFromShard(fromShardId); shardSvrRequest.setEpoch(coll.getEpoch()); @@ -982,7 +982,7 @@ int Balancer::_moveChunks(OperationContext* opCtx, return _commandScheduler->requestMoveChunk(opCtx, migrateInfo, settings); } - MoveRangeRequest requestBase(migrateInfo.to); + MoveRangeRequestBase requestBase(migrateInfo.to); requestBase.setWaitForDelete(balancerConfig->waitForDelete()); requestBase.setMin(migrateInfo.minKey); if (!feature_flags::gNoMoreAutoSplitter.isEnabled( @@ -993,7 +993,7 @@ int Balancer::_moveChunks(OperationContext* opCtx, ShardsvrMoveRange shardSvrRequest(migrateInfo.nss); shardSvrRequest.setDbName(NamespaceString::kAdminDb); - shardSvrRequest.setMoveRangeRequest(requestBase); + shardSvrRequest.setMoveRangeRequestBase(requestBase); shardSvrRequest.setMaxChunkSizeBytes(maxChunkSizeBytes); shardSvrRequest.setFromShard(migrateInfo.from); shardSvrRequest.setEpoch(coll.getEpoch()); diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index a4ba1410014..a2f2dc56a29 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -178,10 +178,11 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) { ShardsvrMoveRange shardsvrRequest(kNss); shardsvrRequest.setDbName(NamespaceString::kAdminDb); shardsvrRequest.setFromShard(kShardId0); - auto& moveRangeRequest = shardsvrRequest.getMoveRangeRequest(); - moveRangeRequest.setToShard(kShardId1); - moveRangeRequest.setMin({}); - moveRangeRequest.setMax({}); + shardsvrRequest.setMaxChunkSizeBytes(1024); + auto& moveRangeRequestBase = shardsvrRequest.getMoveRangeRequestBase(); + moveRangeRequestBase.setToShard(kShardId1); + moveRangeRequestBase.setMin({}); + moveRangeRequestBase.setMax({}); auto networkResponseFuture = launchAsync([&]() { onCommand( diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 1b9f825d3d9..c073ec383e6 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -225,8 +225,8 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam continue; } - auto const& minKey = cloner->_args.getMinKey(); - auto const& maxKey = cloner->_args.getMaxKey(); + auto const& minKey = cloner->_args.getMin().get(); + auto const& maxKey = cloner->_args.getMax().get(); auto const& shardKeyPattern = cloner->_shardKeyPattern; if (!isInRange(documentKey, minKey, maxKey, shardKeyPattern)) { @@ -250,17 +250,20 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam } } -MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request, - const BSONObj& shardKeyPattern, - ConnectionString donorConnStr, - HostAndPort recipientHost) - : _args(std::move(request)), +MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy( + const ShardsvrMoveRange& request, + const WriteConcernOptions& writeConcern, + const BSONObj& shardKeyPattern, + ConnectionString donorConnStr, + HostAndPort recipientHost) + : _args(request), + _writeConcern(writeConcern), _shardKeyPattern(shardKeyPattern), - _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(), - _args.getToShardId().toString())), + _sessionId(MigrationSessionId::generate(_args.getFromShard().toString(), + _args.getToShard().toString())), _donorConnStr(std::move(donorConnStr)), _recipientHost(std::move(recipientHost)), - _forceJumbo(_args.getForceJumbo() != MoveChunkRequest::ForceJumbo::kDoNotForce) {} + _forceJumbo(_args.getForceJumbo() != ForceJumbo::kDoNotForce) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -276,10 +279,7 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx, auto const replCoord = repl::ReplicationCoordinator::get(opCtx); if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { _sessionCatalogSource = std::make_unique<SessionCatalogMigrationSource>( - opCtx, - _args.getNss(), - ChunkRange(_args.getMinKey(), _args.getMaxKey()), - _shardKeyPattern.getKeyPattern()); + opCtx, nss(), ChunkRange(getMin(), getMax()), _shardKeyPattern.getKeyPattern()); // Prime up the session migration source if there are oplog entries to migrate. _sessionCatalogSource->fetchNextOplog(opCtx); @@ -310,19 +310,24 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx, // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; + const bool isThrottled = _args.getSecondaryThrottle(); + MigrationSecondaryThrottleOptions secondaryThrottleOptions = isThrottled + ? MigrationSecondaryThrottleOptions::createWithWriteConcern(_writeConcern) + : MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kOff); + StartChunkCloneRequest::appendAsCommand(&cmdBuilder, - _args.getNss(), + nss(), migrationId, lsid, txnNumber, _sessionId, _donorConnStr, - _args.getFromShardId(), - _args.getToShardId(), - _args.getMinKey(), - _args.getMaxKey(), + _args.getFromShard(), + _args.getToShard(), + getMin(), + getMax(), _shardKeyPattern.toBSON(), - _args.getSecondaryThrottle()); + secondaryThrottleOptions); // Commands sent to shards that accept writeConcern, must always have writeConcern. So if the // StartChunkCloneRequest didn't add writeConcern (from secondaryThrottle), then we add the @@ -355,8 +360,7 @@ Status MigrationChunkClonerSourceLegacy::awaitUntilCriticalSectionIsAppropriate( invariant(!opCtx->lockState()->isLocked()); // If this migration is manual migration that specified "force", enter the critical section // immediately. This means the entire cloning phase will be done under the critical section. - if (_jumboChunkCloneState && - _args.getForceJumbo() == MoveChunkRequest::ForceJumbo::kForceManual) { + if (_jumboChunkCloneState && _args.getForceJumbo() == ForceJumbo::kForceManual) { return Status::OK(); } @@ -368,7 +372,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte invariant(_state == kCloning); invariant(!opCtx->lockState()->isLocked()); if (_jumboChunkCloneState && _forceJumbo) { - if (_args.getForceJumbo() == MoveChunkRequest::ForceJumbo::kForceManual) { + if (_args.getForceJumbo() == ForceJumbo::kForceManual) { auto status = _checkRecipientCloningStatus(opCtx, kMaxWaitToCommitCloneForJumboChunk); if (!status.isOK()) { return status; @@ -385,7 +389,7 @@ StatusWith<BSONObj> MigrationChunkClonerSourceLegacy::commitClone(OperationConte auto responseStatus = _callRecipient(opCtx, [&] { BSONObjBuilder builder; - builder.append(kRecvChunkCommit, _args.getNss().ns()); + builder.append(kRecvChunkCommit, nss().ns()); builder.append("acquireCSOnRecipient", acquireCSOnRecipient); _sessionId.append(&builder); return builder.obj(); @@ -419,8 +423,8 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) noex break; case kCloning: { const auto status = - _callRecipient( - opCtx, createRequestWithSessionId(kRecvChunkAbort, _args.getNss(), _sessionId)) + _callRecipient(opCtx, + createRequestWithSessionId(kRecvChunkAbort, nss(), _sessionId)) .getStatus(); if (!status.isOK()) { LOGV2(21991, @@ -439,13 +443,13 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) noex } bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& doc) { - return isInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern); + return isInRange(doc, getMin(), getMax(), _shardKeyPattern); } void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const repl::OpTime& opTime) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IX)); BSONElement idElement = insertedDoc["_id"]; if (idElement.eoo()) { @@ -458,7 +462,7 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, return; } - if (!isInRange(insertedDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { + if (!isInRange(insertedDoc, getMin(), getMax(), _shardKeyPattern)) { return; } @@ -480,7 +484,7 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, const BSONObj& postImageDoc, const repl::OpTime& opTime, const repl::OpTime& prePostImageOpTime) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IX)); BSONElement idElement = postImageDoc["_id"]; if (idElement.eoo()) { @@ -493,13 +497,12 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, return; } - if (!isInRange(postImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { + if (!isInRange(postImageDoc, getMin(), getMax(), _shardKeyPattern)) { // If the preImageDoc is not in range but the postImageDoc was, we know that the document // has changed shard keys and no longer belongs in the chunk being cloned. We will model // the deletion of the preImage document so that the destination chunk does not receive an // outdated version of this document. - if (preImageDoc && - isInRange(*preImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { + if (preImageDoc && isInRange(*preImageDoc, getMin(), getMax(), _shardKeyPattern)) { onDeleteOp(opCtx, *preImageDoc, opTime, prePostImageOpTime); } return; @@ -522,7 +525,7 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId, const repl::OpTime& opTime, const repl::OpTime&) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IX)); + dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IX)); BSONElement idElement = deletedDocId["_id"]; if (idElement.eoo()) { @@ -728,7 +731,7 @@ uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() { Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, const CollectionPtr& collection, BSONArrayBuilder* arrBuilder) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS)); + dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS)); // If this chunk is too large to store records in _cloneLocs and the command args specify to // attempt to move it, scan the collection directly. @@ -748,7 +751,7 @@ Status MigrationChunkClonerSourceLegacy::nextCloneBatch(OperationContext* opCtx, Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder) { - dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IS)); + dassert(opCtx->lockState()->isCollectionLockedForMode(nss(), MODE_IS)); std::list<BSONObj> deleteList; std::list<BSONObj> updateList; @@ -768,7 +771,7 @@ Status MigrationChunkClonerSourceLegacy::nextModsBatch(OperationContext* opCtx, updateList.splice(updateList.cbegin(), _reload); } - StringData ns = _args.getNss().ns().c_str(); + StringData ns = nss().ns().c_str(); BSONArrayBuilder arrDel(builder->subarrayStart("deleted")); auto noopFn = [](BSONObj idDoc, BSONObj* fullDoc) { *fullDoc = idDoc; @@ -869,14 +872,14 @@ MigrationChunkClonerSourceLegacy::_getIndexScanExecutor( if (!shardKeyIdx) { return {ErrorCodes::IndexNotFound, str::stream() << "can't find index with prefix " << _shardKeyPattern.toBSON() - << " in storeCurrentLocs for " << _args.getNss().ns()}; + << " in storeCurrentLocs for " << nss().ns()}; } // Assume both min and max non-empty, append MinKey's to make them fit chosen index const KeyPattern kp(shardKeyIdx->keyPattern()); - BSONObj min = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMinKey(), false)); - BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(_args.getMaxKey(), false)); + BSONObj min = Helpers::toKeyFormat(kp.extendRangeBound(getMin(), false)); + BSONObj max = Helpers::toKeyFormat(kp.extendRangeBound(getMax(), false)); // We can afford to yield here because any change to the base data that we might miss is already // being queued and will migrate in the 'transferMods' stage. @@ -892,10 +895,10 @@ MigrationChunkClonerSourceLegacy::_getIndexScanExecutor( } Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opCtx) { - AutoGetCollection collection(opCtx, _args.getNss(), MODE_IS); + AutoGetCollection collection(opCtx, nss(), MODE_IS); if (!collection) { return {ErrorCodes::NamespaceNotFound, - str::stream() << "Collection " << _args.getNss().ns() << " does not exist."}; + str::stream() << "Collection " << nss().ns() << " does not exist."}; } auto swExec = _getIndexScanExecutor( @@ -971,7 +974,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC if (!idIdx) { return {ErrorCodes::IndexNotFound, str::stream() << "can't find index '_id' in storeCurrentLocs for " - << _args.getNss().ns()}; + << nss().ns()}; } averageObjectIdSize = idIdx->accessMethod()->getSpaceUsedBytes(opCtx) / totalRecs; } @@ -983,8 +986,7 @@ Status MigrationChunkClonerSourceLegacy::_storeCurrentLocs(OperationContext* opC << maxRecsWhenFull << ", the maximum chunk size is " << _args.getMaxChunkSizeBytes() << ", average document size is " << avgRecSize << ". Found " << recCount << " documents in chunk " - << " ns: " << _args.getNss().ns() << " " << _args.getMinKey() << " -> " - << _args.getMaxKey()}; + << " ns: " << nss().ns() << " " << getMin() << " -> " << getMax()}; } stdx::lock_guard<Latch> lk(_mutex); @@ -1034,7 +1036,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC int iteration = 0; while ((Date_t::now() - startTime) < maxTimeToWait) { auto responseStatus = _callRecipient( - opCtx, createRequestWithSessionId(kRecvChunkStatus, _args.getNss(), _sessionId, true)); + opCtx, createRequestWithSessionId(kRecvChunkStatus, nss(), _sessionId, true)); if (!responseStatus.isOK()) { return responseStatus.getStatus().withContext( "Failed to contact recipient shard to monitor data transfer"); @@ -1142,12 +1144,12 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC << migrationSessionIdStatus.getStatus().toString()}; } - if (res["ns"].str() != _args.getNss().ns() || + if (res["ns"].str() != nss().ns() || (res.hasField("fromShardId") - ? (res["fromShardId"].str() != _args.getFromShardId().toString()) + ? (res["fromShardId"].str() != _args.getFromShard().toString()) : (res["from"].str() != _donorConnStr.toString())) || - !res["min"].isABSONObj() || res["min"].Obj().woCompare(_args.getMinKey()) != 0 || - !res["max"].isABSONObj() || res["max"].Obj().woCompare(_args.getMaxKey()) != 0 || + !res["min"].isABSONObj() || res["min"].Obj().woCompare(getMin()) != 0 || + !res["max"].isABSONObj() || res["max"].Obj().woCompare(getMax()) != 0 || !_sessionId.matches(migrationSessionIdStatus.getValue())) { // This can happen when the destination aborted the migration and received another // recvChunk before this thread sees the transition to the abort state. This is @@ -1158,7 +1160,7 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "Destination shard aborted migration because a new one is running"}; } - if (_args.getForceJumbo() != MoveChunkRequest::ForceJumbo::kForceManual && + if (_args.getForceJumbo() != ForceJumbo::kForceManual && (_memoryUsed > 500 * 1024 * 1024 || (_jumboChunkCloneState && MONGO_unlikely(failTooMuchMemoryUsed.shouldFail())))) { // This is too much memory for us to use so we're going to abort the migration @@ -1182,7 +1184,7 @@ boost::optional<repl::OpTime> MigrationChunkClonerSourceLegacy::nextSessionMigra } repl::OpTime opTimeToWaitIfWaitingForMajority; - const ChunkRange range(_args.getMinKey(), _args.getMaxKey()); + const ChunkRange range(getMin(), getMax()); while (_sessionCatalogSource->hasMoreOplog()) { auto result = _sessionCatalogSource->getLastFetchedOplog(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 45bcdb12404..8c15fa7a0cb 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -42,7 +42,7 @@ #include "mongo/db/s/migration_session_id.h" #include "mongo/db/s/session_catalog_migration_source.h" #include "mongo/platform/mutex.h" -#include "mongo/s/request_types/move_chunk_request.h" +#include "mongo/s/request_types/move_range_request_gen.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/condition_variable.h" #include "mongo/util/net/hostandport.h" @@ -88,7 +88,8 @@ class MigrationChunkClonerSourceLegacy final : public MigrationChunkClonerSource MigrationChunkClonerSourceLegacy& operator=(const MigrationChunkClonerSourceLegacy&) = delete; public: - MigrationChunkClonerSourceLegacy(MoveChunkRequest request, + MigrationChunkClonerSourceLegacy(const ShardsvrMoveRange& request, + const WriteConcernOptions& writeConcern, const BSONObj& shardKeyPattern, ConnectionString donorConnStr, HostAndPort recipientHost); @@ -205,6 +206,20 @@ public: */ std::shared_ptr<Notification<bool>> getNotificationForNextSessionMigrationBatch(); + const NamespaceString& nss() { + return _args.getCommandParameter(); + } + + const BSONObj& getMin() { + invariant(_args.getMin()); + return *_args.getMin(); + } + + const BSONObj& getMax() { + invariant(_args.getMax()); + return *_args.getMax(); + } + private: friend class LogOpForShardingHandler; friend class LogTransactionOperationsForShardingHandler; @@ -307,8 +322,11 @@ private: */ Status _checkRecipientCloningStatus(OperationContext* opCtx, Milliseconds maxTimeToWait); - // The original move chunk request - const MoveChunkRequest _args; + // The original move range request + const ShardsvrMoveRange _args; + + // The write concern associated with the move range + const WriteConcernOptions _writeConcern; // The shard key associated with the namespace const ShardKeyPattern _shardKeyPattern; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 0ec28d4b862..623524a545d 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -200,21 +200,15 @@ protected: * Shortcut to create BSON represenation of a moveChunk request for the specified range with * fixed kDonorConnStr and kRecipientConnStr, respectively. */ - static MoveChunkRequest createMoveChunkRequest(const ChunkRange& chunkRange) { - BSONObjBuilder cmdBuilder; - MoveChunkRequest::appendAsCommand( - &cmdBuilder, - kNss, - ChunkVersion(1, 0, OID::gen(), Timestamp(1, 1)), - kDonorConnStr.getSetName(), - kRecipientConnStr.getSetName(), - chunkRange, - 1024 * 1024, - MigrationSecondaryThrottleOptions::create(MigrationSecondaryThrottleOptions::kDefault), - false, - MoveChunkRequest::ForceJumbo::kDoNotForce); - - return assertGet(MoveChunkRequest::createFromCommand(kNss, cmdBuilder.obj())); + static ShardsvrMoveRange createMoveRangeRequest(const ChunkRange& chunkRange) { + ShardsvrMoveRange req(kNss); + req.setEpoch(OID::gen()); + req.setFromShard(ShardId(kDonorConnStr.getSetName())); + req.setMaxChunkSizeBytes(1024); + req.getMoveRangeRequestBase().setToShard(ShardId(kRecipientConnStr.getSetName())); + req.getMoveRangeRequestBase().setMin(chunkRange.getMin()); + req.getMoveRangeRequestBase().setMax(chunkRange.getMax()); + return req; } /** @@ -278,11 +272,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) { createShardedCollection(contents); - MigrationChunkClonerSourceLegacy cloner( - createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); + const ShardsvrMoveRange req = + createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); { auto futureStartClone = launchAsync([&]() { @@ -387,11 +383,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, RemoveDuplicateDocuments) { createShardedCollection(contents); - MigrationChunkClonerSourceLegacy cloner( - createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); + const ShardsvrMoveRange req = + createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); { auto futureStartClone = launchAsync([&]() { @@ -479,11 +477,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, OneLargeDocumentTransferMods) { createShardedCollection(contents); - MigrationChunkClonerSourceLegacy cloner( - createMoveChunkRequest(ChunkRange(BSON("X" << 1), BSON("X" << 100))), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); + const ShardsvrMoveRange req = + createMoveRangeRequest(ChunkRange(BSON("X" << 1), BSON("X" << 100))); + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); { auto futureStartClone = launchAsync([&]() { @@ -539,11 +539,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) { createShardedCollection(contents); - MigrationChunkClonerSourceLegacy cloner( - createMoveChunkRequest(ChunkRange(BSON("X" << 1), BSON("X" << 1000000))), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); + const ShardsvrMoveRange req = + createMoveRangeRequest(ChunkRange(BSON("X" << 1), BSON("X" << 1000000))); + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); { auto futureStartClone = launchAsync([&]() { @@ -610,11 +612,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ManySmallDocumentsTransferMods) { } TEST_F(MigrationChunkClonerSourceLegacyTest, CollectionNotFound) { - MigrationChunkClonerSourceLegacy cloner( - createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); + const ShardsvrMoveRange req = + createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); cloner.cancelClone(operationContext()); @@ -628,11 +632,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, ShardKeyIndexNotFound) { operationContext(), kNss.db().toString(), BSON("create" << kNss.coll()))); } - MigrationChunkClonerSourceLegacy cloner( - createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); + const ShardsvrMoveRange req = + createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); ASSERT_NOT_OK(cloner.startClone(operationContext(), UUID::gen(), _lsid, _txnNumber)); cloner.cancelClone(operationContext()); @@ -646,11 +652,13 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, FailedToEngageRecipientShard) { createShardedCollection(contents); - MigrationChunkClonerSourceLegacy cloner( - createMoveChunkRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))), - kShardKeyPattern, - kDonorConnStr, - kRecipientConnStr.getServers()[0]); + const ShardsvrMoveRange req = + createMoveRangeRequest(ChunkRange(BSON("X" << 100), BSON("X" << 200))); + MigrationChunkClonerSourceLegacy cloner(req, + WriteConcernOptions(), + kShardKeyPattern, + kDonorConnStr, + kRecipientConnStr.getServers()[0]); { auto futureStartClone = launchAsync([&]() { diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 72791d7d7a0..35f4d814295 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -40,6 +40,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/read_concern.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/s/auto_split_vector.h" #include "mongo/db/s/migration_chunk_cloner_source_legacy.h" #include "mongo/db/s/migration_coordinator.h" #include "mongo/db/s/migration_util.h" @@ -106,6 +107,28 @@ void refreshRecipientRoutingTable(OperationContext* opCtx, executor->scheduleRemoteCommand(request, noOp).getStatus().ignore(); } + +/* + * Taking into account the provided max chunk size, returns: + * - A `max` bound to perform split+move in case the chunk owning `min` is splittable. + * - The `max` bound of the chunk owning `min in case it can't be split (too small or jumbo). + */ +BSONObj computeMaxBound(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& min, + const Chunk& owningChunk, + const ShardKeyPattern& skPattern, + const long long maxChunkSizeBytes) { + // TODO SERVER-64926 do not assume min always present + auto [splitKeys, _] = autoSplitVector( + opCtx, nss, skPattern.toBSON(), min, owningChunk.getMax(), maxChunkSizeBytes, 1); + if (splitKeys.size()) { + return std::move(splitKeys.front()); + } + + return owningChunk.getMax(); +} + MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep1); MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep2); MONGO_FAIL_POINT_DEFINE(moveChunkHangAtStep3); @@ -135,44 +158,47 @@ std::shared_ptr<MigrationChunkClonerSource> MigrationSourceManager::getCurrentCl } MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, - MoveChunkRequest request, + ShardsvrMoveRange&& request, + WriteConcernOptions&& writeConcern, ConnectionString donorConnStr, HostAndPort recipientHost) : _opCtx(opCtx), - _args(std::move(request)), + _args(request), + _writeConcern(writeConcern), _donorConnStr(std::move(donorConnStr)), _recipientHost(std::move(recipientHost)), _stats(ShardingStatistics::get(_opCtx)), _critSecReason(BSON("command" << "moveChunk" - << "fromShard" << _args.getFromShardId() << "toShard" - << _args.getToShardId())), + << "fromShard" << _args.getFromShard() << "toShard" + << _args.getToShard())), _acquireCSOnRecipient(feature_flags::gFeatureFlagMigrationRecipientCriticalSection.isEnabled( serverGlobalParams.featureCompatibility)), _moveTimingHelper(_opCtx, "from", - _args.getNss().ns(), - _args.getMinKey(), - _args.getMaxKey(), + _args.getCommandParameter().ns(), + _args.getMin(), + _args.getMax(), 6, // Total number of steps &kEmptyErrMsgForMoveTimingHelper, - _args.getToShardId(), - _args.getFromShardId()) { + _args.getToShard(), + _args.getFromShard()) { invariant(!_opCtx->lockState()->isLocked()); LOGV2(22016, "Starting chunk migration donation {requestParameters} with expected collection epoch " "{collectionEpoch}", "Starting chunk migration donation", - "requestParameters"_attr = redact(_args.toString()), - "collectionEpoch"_attr = _args.getVersionEpoch()); + "requestParameters"_attr = redact(_args.toBSON({})), + "collectionEpoch"_attr = _args.getEpoch()); _moveTimingHelper.done(1); moveChunkHangAtStep1.pauseWhileSet(); // Make sure the latest shard version is recovered as of the time of the invocation of the // command. - onShardVersionMismatch(_opCtx, _args.getNss(), boost::none); + onShardVersionMismatch(_opCtx, nss(), boost::none); + const auto shardId = ShardingState::get(opCtx)->shardId(); // Complete any unfinished migration pending recovery @@ -190,18 +216,18 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, // Snapshot the committed metadata from the time the migration starts const auto [collectionMetadata, collectionUUID] = [&] { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IS); + AutoGetCollection autoColl(_opCtx, nss(), MODE_IS); uassert(ErrorCodes::InvalidOptions, "cannot move chunks for a collection that doesn't exist", autoColl.getCollection()); UUID collectionUUID = autoColl.getCollection()->uuid(); - auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss()); + auto* const csr = CollectionShardingRuntime::get(_opCtx, nss()); const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); auto optMetadata = csr->getCurrentMetadataIfKnown(); - uassert(StaleConfigInfo(_args.getNss(), + uassert(StaleConfigInfo(nss(), ChunkVersion::IGNORED() /* receivedVersion */, boost::none /* wantedVersion */, shardId, @@ -210,7 +236,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, optMetadata); auto& metadata = *optMetadata; - uassert(StaleConfigInfo(_args.getNss(), + uassert(StaleConfigInfo(nss(), ChunkVersion::IGNORED() /* receivedVersion */, ChunkVersion::UNSHARDED() /* wantedVersion */, shardId, @@ -234,36 +260,52 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, const auto collectionVersion = collectionMetadata.getCollVersion(); const auto shardVersion = collectionMetadata.getShardVersion(); - uassert(StaleConfigInfo(_args.getNss(), + uassert(StaleConfigInfo(nss(), ChunkVersion::IGNORED() /* receivedVersion */, shardVersion /* wantedVersion */, shardId, boost::none), - str::stream() << "cannot move chunk " << _args.toString() + str::stream() << "cannot move chunk " << _args.toBSON({}) << " because collection may have been dropped. " << "current epoch: " << collectionVersion.epoch() - << ", cmd epoch: " << _args.getVersionEpoch(), - _args.getVersionEpoch() == collectionVersion.epoch()); + << ", cmd epoch: " << _args.getEpoch(), + _args.getEpoch() == collectionVersion.epoch()); - uassert(StaleConfigInfo(_args.getNss(), + uassert(StaleConfigInfo(nss(), ChunkVersion::IGNORED() /* receivedVersion */, shardVersion /* wantedVersion */, shardId, boost::none), - str::stream() << "cannot move chunk " << _args.toString() + str::stream() << "cannot move chunk " << _args.toBSON({}) << " because the shard doesn't contain any chunks", shardVersion.majorVersion() > 0); + // Compute the max bound in case only `min` is set (moveRange) + if (!_args.getMax().is_initialized()) { + // TODO SERVER-64926 do not assume min always present + const auto& min = *_args.getMin(); + + const auto cm = collectionMetadata.getChunkManager(); + const auto owningChunk = cm->findIntersectingChunkWithSimpleCollation(min); + const auto max = computeMaxBound(_opCtx, + nss(), + min, + owningChunk, + cm->getShardKeyPattern(), + _args.getMaxChunkSizeBytes()); + _args.getMoveRangeRequestBase().setMax(max); + _moveTimingHelper.setMax(max); + } + const auto& keyPattern = collectionMetadata.getKeyPattern(); const bool validBounds = [&]() { // Return true if provided bounds are respecting the shard key format, false otherwise const auto nFields = keyPattern.nFields(); - if (nFields != _args.getMinKey().nFields() || nFields != _args.getMaxKey().nFields()) { + if (nFields != (*_args.getMin()).nFields() || nFields != (*_args.getMax()).nFields()) { return false; } - BSONObjIterator keyPatternIt(keyPattern), minIt(_args.getMinKey()), - maxIt(_args.getMaxKey()); + BSONObjIterator keyPatternIt(keyPattern), minIt(*_args.getMin()), maxIt(*_args.getMax()); while (keyPatternIt.more()) { const auto keyPatternField = keyPatternIt.next().fieldNameStringData(); @@ -277,43 +319,43 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, return true; }(); - uassert(StaleConfigInfo(_args.getNss(), + uassert(StaleConfigInfo(nss(), ChunkVersion::IGNORED() /* receivedVersion */, shardVersion /* wantedVersion */, shardId, boost::none), str::stream() << "Range bounds do not match the shard key pattern. KeyPattern: " << keyPattern.toString() << " - Bounds: " - << ChunkRange(_args.getMinKey(), _args.getMaxKey()).toString() << ".", + << ChunkRange(*_args.getMin(), *_args.getMax()).toString() << ".", validBounds); ChunkType existingChunk; - uassert(StaleConfigInfo(_args.getNss(), + uassert(StaleConfigInfo(nss(), ChunkVersion::IGNORED() /* receivedVersion */, shardVersion /* wantedVersion */, shardId, boost::none), str::stream() << "Range with bounds " - << ChunkRange(_args.getMinKey(), _args.getMaxKey()).toString() + << ChunkRange(*_args.getMin(), *_args.getMax()).toString() << " is not owned by this shard.", - collectionMetadata.getNextChunk(_args.getMinKey(), &existingChunk)); + collectionMetadata.getNextChunk(*_args.getMin(), &existingChunk)); - uassert(StaleConfigInfo(_args.getNss(), + uassert(StaleConfigInfo(nss(), ChunkVersion::IGNORED() /* receivedVersion */, shardVersion /* wantedVersion */, shardId, boost::none), str::stream() << "Unable to move range with bounds " - << ChunkRange(_args.getMinKey(), _args.getMaxKey()).toString() + << ChunkRange(*_args.getMin(), *_args.getMax()).toString() << " . The closest owned chunk is " << ChunkRange(existingChunk.getMin(), existingChunk.getMax()).toString(), - existingChunk.getRange().covers(ChunkRange(_args.getMinKey(), _args.getMaxKey()))); + existingChunk.getRange().covers(ChunkRange(*_args.getMin(), *_args.getMax()))); _collectionEpoch = collectionVersion.epoch(); _collectionUUID = collectionUUID; _chunkVersion = collectionMetadata.getChunkManager() - ->findIntersectingChunkWithSimpleCollation(_args.getMinKey()) + ->findIntersectingChunkWithSimpleCollation(*_args.getMin()) .getLastmod(); _moveTimingHelper.done(2); @@ -336,9 +378,9 @@ void MigrationSourceManager::startClone() { uassertStatusOK(ShardingLogging::get(_opCtx)->logChangeChecked( _opCtx, "moveChunk.start", - _args.getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" - << _args.getFromShardId() << "to" << _args.getToShardId()), + nss().ns(), + BSON("min" << *_args.getMin() << "max" << *_args.getMax() << "from" << _args.getFromShard() + << "to" << _args.getToShard()), ShardingCatalogClient::kMajorityWriteConcern)); _cloneAndCommitTimer.reset(); @@ -350,13 +392,13 @@ void MigrationSourceManager::startClone() { const auto metadata = _getCurrentMetadataAndCheckEpoch(); AutoGetCollection autoColl(_opCtx, - _args.getNss(), + nss(), replEnabled ? MODE_IX : MODE_X, AutoGetCollectionViewMode::kViewsForbidden, _opCtx->getServiceContext()->getPreciseClockSource()->now() + Milliseconds(migrationLockAcquisitionMaxWaitMS.load())); - auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss()); + auto* const csr = CollectionShardingRuntime::get(_opCtx, nss()); const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); // Having the metadata manager registered on the collection sharding state is what indicates @@ -364,14 +406,14 @@ void MigrationSourceManager::startClone() { // migration, write operations require the cloner to be present in order to track changes to // the chunk which needs to be transmitted to the recipient. _cloneDriver = std::make_shared<MigrationChunkClonerSourceLegacy>( - _args, metadata.getKeyPattern(), _donorConnStr, _recipientHost); + _args, _writeConcern, metadata.getKeyPattern(), _donorConnStr, _recipientHost); _coordinator.emplace(_cloneDriver->getSessionId(), - _args.getFromShardId(), - _args.getToShardId(), - _args.getNss(), + _args.getFromShard(), + _args.getToShard(), + nss(), *_collectionUUID, - ChunkRange(_args.getMinKey(), _args.getMaxKey()), + ChunkRange(*_args.getMin(), *_args.getMax()), *_chunkVersion, _args.getWaitForDelete()); @@ -427,9 +469,9 @@ void MigrationSourceManager::enterCriticalSection() { // Check that there are no chunks on the recepient shard. Write an oplog event for change // streams if this is the first migration to the recipient. - if (!metadata.getChunkManager()->getVersion(_args.getToShardId()).isSet()) { + if (!metadata.getChunkManager()->getVersion(_args.getToShard()).isSet()) { migrationutil::notifyChangeStreamsOnRecipientFirstChunk( - _opCtx, _args.getNss(), _args.getFromShardId(), _args.getToShardId(), _collectionUUID); + _opCtx, nss(), _args.getFromShard(), _args.getToShard(), _collectionUUID); } // Mark the shard as running critical operation, which requires recovery on crash. @@ -445,7 +487,7 @@ void MigrationSourceManager::enterCriticalSection() { "Starting critical section", "migrationId"_attr = _coordinator->getMigrationId()); - _critSec.emplace(_opCtx, _args.getNss(), _critSecReason); + _critSec.emplace(_opCtx, nss(), _critSecReason); _state = kCriticalSection; @@ -457,7 +499,7 @@ void MigrationSourceManager::enterCriticalSection() { // will stall behind the flag. Status signalStatus = shardmetadatautil::updateShardCollectionsEntry( _opCtx, - BSON(ShardCollectionType::kNssFieldName << _args.getNss().ns()), + BSON(ShardCollectionType::kNssFieldName << nss().ns()), BSON("$inc" << BSON(ShardCollectionType::kEnterCriticalSectionCounterFieldName << 1)), false /*upsert*/); if (!signalStatus.isOK()) { @@ -479,7 +521,8 @@ void MigrationSourceManager::commitChunkOnRecipient() { invariant(_state == kCriticalSection); ScopeGuard scopedGuard([&] { _cleanupOnError(); - migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, _args.getNss()); + migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, + _args.getCommandParameter()); }); // Tell the recipient shard to fetch the latest changes. @@ -502,9 +545,10 @@ void MigrationSourceManager::commitChunkOnRecipient() { void MigrationSourceManager::commitChunkMetadataOnConfig() { invariant(!_opCtx->lockState()->isLocked()); invariant(_state == kCloneCompleted); + ScopeGuard scopedGuard([&] { _cleanupOnError(); - migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, _args.getNss()); + migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, nss()); }); // If we have chunks left on the FROM shard, bump the version of one of them as well. This will @@ -516,15 +560,15 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { const auto metadata = _getCurrentMetadataAndCheckEpoch(); ChunkType migratedChunkType; - migratedChunkType.setMin(_args.getMinKey()); - migratedChunkType.setMax(_args.getMaxKey()); + migratedChunkType.setMin(*_args.getMin()); + migratedChunkType.setMax(*_args.getMax()); migratedChunkType.setVersion(*_chunkVersion); const auto currentTime = VectorClock::get(_opCtx)->getTime(); CommitChunkMigrationRequest::appendAsCommand(&builder, - _args.getNss(), - _args.getFromShardId(), - _args.getToShardId(), + nss(), + _args.getFromShard(), + _args.getToShard(), migratedChunkType, metadata.getCollVersion(), currentTime.clusterTime().asTimestamp()); @@ -559,12 +603,12 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { if (!migrationCommitStatus.isOK()) { { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX); - CollectionShardingRuntime::get(_opCtx, _args.getNss())->clearFilteringMetadata(_opCtx); + AutoGetCollection autoColl(_opCtx, nss(), MODE_IX); + CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx); } scopedGuard.dismiss(); _cleanup(false); - migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, _args.getNss()); + migrationutil::asyncRecoverMigrationUntilSuccessOrStepDown(_opCtx, nss()); uassertStatusOK(migrationCommitStatus); } @@ -582,7 +626,7 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { "Starting post-migration commit refresh on the shard", "migrationId"_attr = _coordinator->getMigrationId()); - forceShardFilteringMetadataRefresh(_opCtx, _args.getNss()); + forceShardFilteringMetadataRefresh(_opCtx, nss()); LOGV2_DEBUG_OPTIONS(4817405, 2, @@ -598,13 +642,13 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { "error"_attr = redact(ex)); { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX); - CollectionShardingRuntime::get(_opCtx, _args.getNss())->clearFilteringMetadata(_opCtx); + AutoGetCollection autoColl(_opCtx, nss(), MODE_IX); + CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx); } scopedGuard.dismiss(); _cleanup(false); // Best-effort recover of the shard version. - onShardVersionMismatchNoExcept(_opCtx, _args.getNss(), boost::none).ignore(); + onShardVersionMismatchNoExcept(_opCtx, nss(), boost::none).ignore(); throw; } @@ -613,9 +657,9 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(); // Check if there are no chunks left on donor shard. Write an oplog event for change streams if // the last chunk migrated off the donor. - if (!refreshedMetadata.getChunkManager()->getVersion(_args.getFromShardId()).isSet()) { + if (!refreshedMetadata.getChunkManager()->getVersion(_args.getFromShard()).isSet()) { migrationutil::notifyChangeStreamsOnDonorLastChunk( - _opCtx, _args.getNss(), _args.getFromShardId(), _collectionUUID); + _opCtx, nss(), _args.getFromShard(), _collectionUUID); } @@ -627,9 +671,9 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { // If the migration has succeeded, clear the BucketCatalog so that the buckets that got migrated // out are no longer updatable. - if (_args.getNss().isTimeseriesBucketsCollection()) { + if (nss().isTimeseriesBucketsCollection()) { auto& bucketCatalog = BucketCatalog::get(_opCtx); - bucketCatalog.clear(_args.getNss().getTimeseriesViewNamespace()); + bucketCatalog.clear(nss().getTimeseriesViewNamespace()); } _coordinator->setMigrationDecision(DecisionEnum::kCommitted); @@ -652,23 +696,22 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { ShardingLogging::get(_opCtx)->logChange( _opCtx, "moveChunk.commit", - _args.getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" - << _args.getFromShardId() << "to" << _args.getToShardId() << "counts" - << *_recipientCloneCounts), + nss().ns(), + BSON("min" << *_args.getMin() << "max" << *_args.getMax() << "from" << _args.getFromShard() + << "to" << _args.getToShard() << "counts" << *_recipientCloneCounts), ShardingCatalogClient::kMajorityWriteConcern); - const ChunkRange range(_args.getMinKey(), _args.getMaxKey()); + const ChunkRange range(*_args.getMin(), *_args.getMax()); if (!_acquireCSOnRecipient && !MONGO_unlikely(doNotRefreshRecipientAfterCommit.shouldFail())) { // Best-effort make the recipient refresh its routing table to the new collection // version. refreshRecipientRoutingTable( - _opCtx, _args.getNss(), _recipientHost, refreshedMetadata.getCollVersion()); + _opCtx, nss(), _recipientHost, refreshedMetadata.getCollVersion()); } std::string orphanedRangeCleanUpErrMsg = str::stream() - << "Moved chunks successfully but failed to clean up " << _args.getNss() << " range " + << "Moved chunks successfully but failed to clean up " << nss() << " range " << redact(range.toString()) << " due to: "; if (_args.getWaitForDelete()) { @@ -676,7 +719,7 @@ void MigrationSourceManager::commitChunkMetadataOnConfig() { "Waiting for migration cleanup after chunk commit for the namespace {namespace} " "and range {range}", "Waiting for migration cleanup after chunk commit", - "namespace"_attr = _args.getNss(), + "namespace"_attr = nss(), "range"_attr = redact(range.toString()), "migrationId"_attr = _coordinator->getMigrationId()); @@ -703,9 +746,9 @@ void MigrationSourceManager::_cleanupOnError() noexcept { ShardingLogging::get(_opCtx)->logChange( _opCtx, "moveChunk.error", - _args.getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" - << _args.getFromShardId() << "to" << _args.getToShardId()), + _args.getCommandParameter().ns(), + BSON("min" << *_args.getMin() << "max" << *_args.getMax() << "from" << _args.getFromShard() + << "to" << _args.getToShard()), ShardingCatalogClient::kMajorityWriteConcern); _cleanup(true); @@ -722,8 +765,8 @@ SharedSemiFuture<void> MigrationSourceManager::abort() { CollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() { auto metadata = [&] { UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IS); - auto* const css = CollectionShardingRuntime::get(_opCtx, _args.getNss()); + AutoGetCollection autoColl(_opCtx, _args.getCommandParameter(), MODE_IS); + auto* const css = CollectionShardingRuntime::get(_opCtx, _args.getCommandParameter()); const auto optMetadata = css->getCurrentMetadataIfKnown(); uassert(ErrorCodes::ConflictingOperationInProgress, @@ -749,8 +792,8 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept { auto cloneDriver = [&]() { // Unregister from the collection's sharding state and exit the migration critical section. UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX); - auto* const csr = CollectionShardingRuntime::get(_opCtx, _args.getNss()); + AutoGetCollection autoColl(_opCtx, nss(), MODE_IX); + auto* const csr = CollectionShardingRuntime::get(_opCtx, nss()); const auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr); if (_state != kCreated) { @@ -806,7 +849,7 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept { // is possible that the persisted metadata is rolled back after step down, but the // write which cleared the 'inMigration' flag is not, a secondary node will report // itself at an older shard version. - CatalogCacheLoader::get(newOpCtx).waitForCollectionFlush(newOpCtx, _args.getNss()); + CatalogCacheLoader::get(newOpCtx).waitForCollectionFlush(newOpCtx, nss()); // Clear the 'minOpTime recovery' document so that the next time a node from this // shard becomes a primary, it won't have to recover the config server optime. @@ -827,24 +870,24 @@ void MigrationSourceManager::_cleanup(bool completeMigration) noexcept { "Failed to complete the migration {migrationId} with " "{chunkMigrationRequestParameters} due to: {error}", "Failed to complete the migration", - "chunkMigrationRequestParameters"_attr = redact(_args.toString()), + "chunkMigrationRequestParameters"_attr = redact(_args.toBSON({})), "error"_attr = redact(ex), "migrationId"_attr = _coordinator->getMigrationId()); // Something went really wrong when completing the migration just unset the metadata and let // the next op to recover. UninterruptibleLockGuard noInterrupt(_opCtx->lockState()); - AutoGetCollection autoColl(_opCtx, _args.getNss(), MODE_IX); - CollectionShardingRuntime::get(_opCtx, _args.getNss())->clearFilteringMetadata(_opCtx); + AutoGetCollection autoColl(_opCtx, nss(), MODE_IX); + CollectionShardingRuntime::get(_opCtx, nss())->clearFilteringMetadata(_opCtx); } } BSONObj MigrationSourceManager::getMigrationStatusReport() const { - return migrationutil::makeMigrationStatusDocument(_args.getNss(), - _args.getFromShardId(), - _args.getToShardId(), + return migrationutil::makeMigrationStatusDocument(_args.getCommandParameter(), + _args.getFromShard(), + _args.getToShard(), true, - _args.getMinKey(), - _args.getMaxKey()); + *_args.getMin(), + *_args.getMax()); } MigrationSourceManager::ScopedRegisterer::ScopedRegisterer( @@ -857,8 +900,8 @@ MigrationSourceManager::ScopedRegisterer::ScopedRegisterer( MigrationSourceManager::ScopedRegisterer::~ScopedRegisterer() { UninterruptibleLockGuard noInterrupt(_msm->_opCtx->lockState()); - AutoGetCollection autoColl(_msm->_opCtx, _msm->_args.getNss(), MODE_IX); - auto csr = CollectionShardingRuntime::get(_msm->_opCtx, _msm->_args.getNss()); + AutoGetCollection autoColl(_msm->_opCtx, _msm->_args.getCommandParameter(), MODE_IX); + auto csr = CollectionShardingRuntime::get(_msm->_opCtx, _msm->_args.getCommandParameter()); auto csrLock = CollectionShardingRuntime::CSRLock::lockExclusive(_msm->_opCtx, csr); invariant(_msm == std::exchange(msmForCsr(csr), nullptr)); } diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 9b1b64dd87f..d93c701f3d5 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -35,7 +35,7 @@ #include "mongo/db/s/migration_chunk_cloner_source.h" #include "mongo/db/s/migration_coordinator.h" #include "mongo/db/s/move_timing_helper.h" -#include "mongo/s/request_types/move_chunk_request.h" +#include "mongo/s/request_types/move_range_request_gen.h" #include "mongo/util/timer.h" namespace mongo { @@ -101,7 +101,8 @@ public: * to be after acquiring the distributed lock. */ MigrationSourceManager(OperationContext* opCtx, - MoveChunkRequest request, + ShardsvrMoveRange&& request, + WriteConcernOptions&& writeConcern, ConnectionString donorConnStr, HostAndPort recipientHost); ~MigrationSourceManager(); @@ -171,6 +172,10 @@ public: */ BSONObj getMigrationStatusReport() const; + const NamespaceString& nss() { + return _args.getCommandParameter(); + } + private: // Used to track the current state of the source manager. See the methods above, which have // comments explaining the various state transitions. @@ -206,8 +211,11 @@ private: // The caller must guarantee it outlives the MigrationSourceManager. OperationContext* const _opCtx; - // The parameters to the moveChunk command - const MoveChunkRequest _args; + // The parameters to the moveRange command + ShardsvrMoveRange _args; + + // The write concern received for the moveRange command + const WriteConcernOptions _writeConcern; // The resolved connection string of the donor shard const ConnectionString _donorConnStr; diff --git a/src/mongo/db/s/move_timing_helper.cpp b/src/mongo/db/s/move_timing_helper.cpp index f3270f67986..f62adc83956 100644 --- a/src/mongo/db/s/move_timing_helper.cpp +++ b/src/mongo/db/s/move_timing_helper.cpp @@ -44,8 +44,8 @@ namespace mongo { MoveTimingHelper::MoveTimingHelper(OperationContext* opCtx, const std::string& where, const std::string& ns, - const BSONObj& min, - const BSONObj& max, + const boost::optional<BSONObj>& min, + const boost::optional<BSONObj>& max, int totalNumSteps, std::string* cmdErrmsg, const ShardId& toShard, @@ -55,17 +55,19 @@ MoveTimingHelper::MoveTimingHelper(OperationContext* opCtx, _ns(ns), _to(toShard), _from(fromShard), + _min(min), + _max(max), _totalNumSteps(totalNumSteps), _cmdErrmsg(cmdErrmsg), - _nextStep(0) { - _b.append("min", min); - _b.append("max", max); -} + _nextStep(0) {} MoveTimingHelper::~MoveTimingHelper() { // even if logChange doesn't throw, bson does // sigh try { + _b.append("min", _min.get_value_or(BSONObj())); + _b.append("max", _max.get_value_or(BSONObj())); + if (_to.isValid()) { _b.append("to", _to.toString()); } diff --git a/src/mongo/db/s/move_timing_helper.h b/src/mongo/db/s/move_timing_helper.h index 9d7f75ee90e..a90dd465090 100644 --- a/src/mongo/db/s/move_timing_helper.h +++ b/src/mongo/db/s/move_timing_helper.h @@ -45,14 +45,22 @@ public: MoveTimingHelper(OperationContext* opCtx, const std::string& where, const std::string& ns, - const BSONObj& min, - const BSONObj& max, + const boost::optional<BSONObj>& min, + const boost::optional<BSONObj>& max, int totalNumSteps, std::string* cmdErrmsg, const ShardId& toShard, const ShardId& fromShard); ~MoveTimingHelper(); + void setMin(const BSONObj& min) { + _min.emplace(min); + } + + void setMax(const BSONObj& max) { + _max.emplace(max); + } + void done(int step); private: @@ -64,6 +72,8 @@ private: const std::string _ns; const ShardId _to; const ShardId _from; + + boost::optional<BSONObj> _min, _max; const int _totalNumSteps; const std::string* _cmdErrmsg; diff --git a/src/mongo/db/s/shardsvr_move_range_command.cpp b/src/mongo/db/s/shardsvr_move_range_command.cpp index ad92264d0cd..0eb40a7fd31 100644 --- a/src/mongo/db/s/shardsvr_move_range_command.cpp +++ b/src/mongo/db/s/shardsvr_move_range_command.cpp @@ -33,7 +33,6 @@ #include "mongo/db/commands.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/active_migrations_registry.h" -#include "mongo/db/s/auto_split_vector.h" #include "mongo/db/s/migration_source_manager.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/sharding_statistics.h" @@ -86,31 +85,6 @@ public: void typedRun(OperationContext* opCtx) { uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); opCtx->setAlwaysInterruptAtStepDownOrUp(); - const auto& originalReq = request(); - - const auto WC = opCtx->getWriteConcern(); - BSONObjBuilder moveChunkReqBuilder(originalReq.toBSON({})); - moveChunkReqBuilder.append(WriteConcernOptions::kWriteConcernField, WC.toBSON()); - - // TODO SERVER-64926 do not assume min always present - const auto& min = *request().getMin(); - - // TODO SERVER-64817 compute missing bound of `moveRange` within MigrationSourceManager - if (!originalReq.getMax().is_initialized()) { - // Compute the max bound in case only `min` is set (moveRange) - const auto cm = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, ns())); - uassert(ErrorCodes::NamespaceNotSharded, - "Could not move chunk. Collection is no longer sharded", - cm.isSharded()); - - const auto owningChunk = cm.findIntersectingChunkWithSimpleCollation(min); - const auto max = computeMaxBound(opCtx, owningChunk, cm.getShardKeyPattern()); - moveChunkReqBuilder.append(MoveRangeRequest::kMaxFieldName, max); - } - - MoveChunkRequest moveChunkRequest = uassertStatusOK( - MoveChunkRequest::createFromCommand(ns(), moveChunkReqBuilder.obj())); // Make sure we're as up-to-date as possible with shard information. This catches the // case where we might have changed a shard's host by removing/adding a shard with the @@ -118,13 +92,14 @@ public: Grid::get(opCtx)->shardRegistry()->reload(opCtx); auto scopedMigration = uassertStatusOK( - ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(opCtx, moveChunkRequest)); + ActiveMigrationsRegistry::get(opCtx).registerDonateChunk(opCtx, request())); // Check if there is an existing migration running and if so, join it if (scopedMigration.mustExecute()) { auto moveChunkComplete = ExecutorFuture<void>(_getExecutor()) - .then([moveChunkRequest, + .then([req = request(), + writeConcern = opCtx->getWriteConcern(), scopedMigration = std::move(scopedMigration), serviceContext = opCtx->getServiceContext()]() mutable { // This local variable is created to enforce that the scopedMigration is @@ -157,7 +132,7 @@ public: Status status = {ErrorCodes::InternalError, "Uninitialized value"}; try { - _runImpl(opCtx, moveChunkRequest); + _runImpl(opCtx, std::move(req), std::move(writeConcern)); status = Status::OK(); } catch (const DBException& e) { status = e.toStatus(); @@ -180,7 +155,7 @@ public: uassertStatusOK(scopedMigration.waitForCompletion(opCtx)); } - if (moveChunkRequest.getWaitForDelete()) { + if (request().getWaitForDelete()) { // Ensure we capture the latest opTime in the system, since range deletion happens // asynchronously with a different OperationContext. This must be done after the // above join, because each caller must set the opTime to wait for writeConcern for @@ -214,35 +189,10 @@ public: ActionType::internal)); } - /* - * Compute the max bound in case only `min` is set (moveRange). - * - * Taking into account the provided max chunk size, returns: - * - A `max` bound to perform split+move in case the chunk owning `min` is splittable. - * - The `max` bound of the chunk owning `min in case it can't be split (too small or - * jumbo). - */ - BSONObj computeMaxBound(OperationContext* opCtx, - const Chunk& owningChunk, - const ShardKeyPattern& skPattern) { - // TODO SERVER-64926 do not assume min always present - const auto& min = *request().getMin(); - auto [splitKeys, _] = autoSplitVector(opCtx, - ns(), - skPattern.toBSON(), - min, - owningChunk.getMax(), - *request().getMaxChunkSizeBytes(), - 1); - if (splitKeys.size()) { - return std::move(splitKeys.front()); - } - - return owningChunk.getMax(); - } - - static void _runImpl(OperationContext* opCtx, const MoveChunkRequest& moveChunkRequest) { - if (moveChunkRequest.getFromShardId() == moveChunkRequest.getToShardId()) { + static void _runImpl(OperationContext* opCtx, + ShardsvrMoveRange&& request, + WriteConcernOptions&& writeConcern) { + if (request.getFromShard() == request.getToShard()) { // TODO: SERVER-46669 handle wait for delete. return; } @@ -251,18 +201,18 @@ public: auto const shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto donorConnStr = - uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getFromShardId())) + uassertStatusOK(shardRegistry->getShard(opCtx, request.getFromShard())) ->getConnString(); const auto recipientHost = uassertStatusOK([&] { - auto recipientShard = uassertStatusOK( - shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId())); + auto recipientShard = + uassertStatusOK(shardRegistry->getShard(opCtx, request.getToShard())); return recipientShard->getTargeter()->findHost( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}); }()); MigrationSourceManager migrationSourceManager( - opCtx, moveChunkRequest, donorConnStr, recipientHost); + opCtx, std::move(request), std::move(writeConcern), donorConnStr, recipientHost); migrationSourceManager.startClone(); migrationSourceManager.awaitToCatchUp(); |