diff options
author | Paolo Polato <paolo.polato@mongodb.com> | 2022-05-29 20:07:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-29 20:43:20 +0000 |
commit | 9ae0a24f4d2323424784a941c783c7cf39c3ec20 (patch) | |
tree | 5ac67365b48a57876941ab1c0660582227eff09a | |
parent | 491f15aaca5d9c4f8a808f6a0a449ad6499467e3 (diff) | |
download | mongo-9ae0a24f4d2323424784a941c783c7cf39c3ec20.tar.gz |
SERVER-66480 introduce joinMigration shard cmd to set a barrier on the Balancer initialisation
(cherry picked from commit 531dfe764bc0645f5e676feaf5687100c0a5e612)
-rw-r--r-- | jstests/core/views/views_all_commands.js | 1 | ||||
-rw-r--r-- | jstests/replsets/db_reads_while_recovering_all_commands.js | 1 | ||||
-rw-r--r-- | jstests/sharding/read_write_concern_defaults_application.js | 1 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp | 136 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_util.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_util.h | 3 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_join_migrations_command.cpp | 104 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/request_types/shardsvr_join_migrations_request.idl | 42 |
11 files changed, 293 insertions, 74 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index bbdb2623e75..45f19278511 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -159,6 +159,7 @@ let viewsCommandTests = { _shardsvrDropDatabase: {skip: isAnInternalCommand}, _shardsvrDropDatabaseParticipant: {skip: isAnInternalCommand}, _shardsvrGetStatsForBalancing: {skip: isAnInternalCommand}, + _shardsvrJoinMigrations: {skip: isAnInternalCommand}, _shardsvrMovePrimary: {skip: isAnInternalCommand}, _shardsvrMoveRange: { command: {_shardsvrMoveRange: "test.view"}, diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index ec95e98892e..02fc2486ca4 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -87,6 +87,7 @@ const allCommands = { _shardsvrDropIndexes: {skip: isAnInternalCommand}, _shardsvrCreateCollectionParticipant: {skip: isPrimaryOnly}, _shardsvrGetStatsForBalancing: {skip: isPrimaryOnly}, + _shardsvrJoinMigrations: {skip: isAnInternalCommand}, _shardsvrMovePrimary: {skip: isPrimaryOnly}, _shardsvrMoveRange: {skip: isPrimaryOnly}, _shardsvrRenameCollection: {skip: isPrimaryOnly}, diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index b264ec38ee6..9b8b3a1efda 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -157,6 +157,7 @@ let testCases = { _shardsvrDropDatabase: {skip: "internal command"}, _shardsvrDropDatabaseParticipant: {skip: "internal command"}, _shardsvrGetStatsForBalancing: {skip: "internal command"}, + _shardsvrJoinMigrations: {skip: "internal command"}, _shardsvrMovePrimary: {skip: "internal command"}, _shardsvrMoveRange: { skip: diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index fa1e774f18e..237dad8bcf8 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -419,6 +419,7 @@ env.Library( 'shardsvr_drop_database_participant_command.cpp', 'shardsvr_drop_indexes_command.cpp', 'shardsvr_get_stats_for_balancing_command.cpp', + 'shardsvr_join_migrations_command.cpp', 'shardsvr_merge_chunks_command.cpp', 'shardsvr_move_primary_command.cpp', 'shardsvr_move_range_command.cpp', diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp index 797d4c00bbc..6bff2a90f38 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp @@ -32,10 +32,12 @@ #include "mongo/db/s/balancer/balancer_commands_scheduler_impl.h" #include "mongo/db/client.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/s/sharding_util.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/migration_secondary_throttle_options.h" +#include "mongo/s/request_types/shardsvr_join_migrations_request_gen.h" #include "mongo/s/shard_id.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/fail_point.h" @@ -47,6 +49,35 @@ namespace { MONGO_FAIL_POINT_DEFINE(pauseSubmissionsFailPoint); MONGO_FAIL_POINT_DEFINE(deferredCleanupCompletedCheckpoint); +void waitForQuiescedCluster(OperationContext* opCtx) { + const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + ShardsvrJoinMigrations joinShardOnMigrationsRequest; + joinShardOnMigrationsRequest.setDbName(NamespaceString::kAdminDb); + + auto unquiescedShardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); + + const auto responses = + sharding_util::sendCommandToShards(opCtx, + NamespaceString::kAdminDb.toString(), + joinShardOnMigrationsRequest.toBSON({}), + unquiescedShardIds, + executor, + false /*throwOnError*/); + for (const auto& r : responses) { + auto responseOutcome = r.swResponse.isOK() + ? getStatusFromCommandResult(r.swResponse.getValue().data) + : r.swResponse.getStatus(); + + if (!responseOutcome.isOK()) { + LOGV2_WARNING(6648001, + "Could not complete _ShardsvrJoinMigrations on shard", + "error"_attr = responseOutcome, + "shard"_attr = r.shardId); + } + } +} + + Status processRemoteResponse(const executor::RemoteCommandResponse& remoteResponse) { if (!remoteResponse.status.isOK()) { return remoteResponse.status; @@ -124,7 +155,7 @@ std::vector<RequestData> rebuildRequestsFromRecoveryInfo( FindCommandRequest findRequest{MigrationType::ConfigNS}; dbClient.find(std::move(findRequest), ReadPreferenceSetting{}, documentProcessor); } catch (const DBException& e) { - LOGV2_ERROR(5847215, "Failed to load requests to recover", "error"_attr = redact(e)); + LOGV2_ERROR(5847215, "Failed to fetch requests to recover", "error"_attr = redact(e)); } return rebuiltRequests; @@ -177,14 +208,26 @@ void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx, if (!_executor) { _executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); } - auto requestsToRecover = rebuildRequestsFromRecoveryInfo(opCtx, defaultValues); - _numRequestsToRecover = requestsToRecover.size(); - _state = _numRequestsToRecover == 0 ? SchedulerState::Running : SchedulerState::Recovering; + _state = SchedulerState::Recovering; - for (auto& requestToRecover : requestsToRecover) { - _enqueueRequest(lg, std::move(requestToRecover)); + try { + waitForQuiescedCluster(opCtx); + } catch (const DBException& e) { + LOGV2_WARNING( + 6648002, "Could not join migration activity on shards", "error"_attr = redact(e)); } + auto requestsToRecover = rebuildRequestsFromRecoveryInfo(opCtx, defaultValues); + _numRequestsToRecover = requestsToRecover.size(); + if (_numRequestsToRecover == 0) { + LOGV2(6648003, "Balancer scheduler recovery complete. Switching to regular execution"); + _state = SchedulerState::Running; + } else { + for (auto& requestToRecover : requestsToRecover) { + // TODO I'd prefer to simply delete the entries. + _enqueueRequest(lg, std::move(requestToRecover)); + } + } _workerThreadHandle = stdx::thread([this] { _workerThread(); }); } diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp index a2f2dc56a29..aab01274138 100644 --- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp +++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp @@ -126,6 +126,28 @@ protected: ConfigServerTestFixture::tearDown(); } + /* + * Extra setup function to define the whole sequence of (mocked) remote command responses that a + * test is expected to receive during its execution. + * - Must be invoked before running _scheduler.start() + * - The returned future must be stored in a local variable and kept in scope to ensure that the + * sequence gets generated + */ + executor::NetworkTestEnv::FutureHandle<void> setRemoteResponses( + std::vector<executor::NetworkTestEnv::OnCommandFunction> remoteResponseGenerators = {}) { + std::vector<executor::NetworkTestEnv::OnCommandFunction> generatorsWithStartSequence; + // Set an OK response for every shardSvrJoinMigration command send out by the start() method + // of the commands scheduler + for (size_t i = 0; i < kShardList.size(); ++i) { + generatorsWithStartSequence.push_back( + [&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }); + } + generatorsWithStartSequence.insert(generatorsWithStartSequence.end(), + remoteResponseGenerators.begin(), + remoteResponseGenerators.end()); + return launchAsync([this, g = std::move(generatorsWithStartSequence)] { onCommands(g); }); + } + void configureTargeter(OperationContext* opCtx, ShardId shardId, const HostAndPort& host) { auto targeter = RemoteCommandTargeterMock::get( uassertStatusOK(shardRegistry()->getShard(opCtx, shardId))->getTargeter()); @@ -136,8 +158,10 @@ protected: }; TEST_F(BalancerCommandsSchedulerTest, StartAndStopScheduler) { + auto remoteResponsesFuture = setRemoteResponses(); _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); _scheduler.stop(); + remoteResponsesFuture.default_timed_get(); } TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) { @@ -145,16 +169,16 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) { globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint"); auto timesEnteredFailPoint = deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0); + auto remoteResponsesFuture = setRemoteResponses( + {[&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }}); + _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); MigrateInfo migrateInfo = makeMigrationInfo(0, kShardId1, kShardId0); - auto networkResponseFuture = launchAsync([&]() { - onCommand( - [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); - }); + auto futureResponse = _scheduler.requestMoveChunk( operationContext(), migrateInfo, getMoveChunkSettings(), false /* issuedByRemoteUser */); ASSERT_OK(futureResponse.getNoThrow()); - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1); // Ensure DistLock is released correctly { @@ -174,6 +198,8 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) { globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint"); auto timesEnteredFailPoint = deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0); + auto remoteResponsesFuture = setRemoteResponses( + {[&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }}); _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); ShardsvrMoveRange shardsvrRequest(kNss); shardsvrRequest.setDbName(NamespaceString::kAdminDb); @@ -184,14 +210,10 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) { moveRangeRequestBase.setMin({}); moveRangeRequestBase.setMax({}); - auto networkResponseFuture = launchAsync([&]() { - onCommand( - [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); - }); auto futureResponse = _scheduler.requestMoveRange( operationContext(), shardsvrRequest, WriteConcernOptions(), false /* issuedByRemoteUser */); ASSERT_OK(futureResponse.getNoThrow()); - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1); // Ensure DistLock is released correctly { @@ -207,22 +229,21 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) { } TEST_F(BalancerCommandsSchedulerTest, SuccessfulMergeChunkCommand) { + auto remoteResponsesFuture = setRemoteResponses( + {[&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }}); _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); - auto networkResponseFuture = launchAsync([&]() { - onCommand( - [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); - }); ChunkRange range(BSON("x" << 0), BSON("x" << 20)); ChunkVersion version(1, 1, OID::gen(), Timestamp(10)); auto futureResponse = _scheduler.requestMergeChunks(operationContext(), kNss, kShardId0, range, version); ASSERT_OK(futureResponse.getNoThrow()); - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, MergeChunkNonexistentShard) { + auto remoteResponsesFuture = setRemoteResponses(); _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); ChunkRange range(BSON("x" << 0), BSON("x" << 20)); ChunkVersion version(1, 1, OID::gen(), Timestamp(10)); @@ -230,12 +251,11 @@ TEST_F(BalancerCommandsSchedulerTest, MergeChunkNonexistentShard) { operationContext(), kNss, ShardId("nonexistent"), range, version); auto shardNotFoundError = Status{ErrorCodes::ShardNotFound, "Shard nonexistent not found"}; ASSERT_EQ(futureResponse.getNoThrow(), shardNotFoundError); + remoteResponsesFuture.default_timed_get(); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, SuccessfulAutoSplitVectorCommand) { - _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); - ChunkType splitChunk = makeChunk(0, kShardId0); BSONObjBuilder autoSplitVectorResponse; autoSplitVectorResponse.append("ok", "1"); BSONArrayBuilder splitKeys(autoSplitVectorResponse.subarrayStart("splitKeys")); @@ -243,11 +263,14 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulAutoSplitVectorCommand) { splitKeys.append(BSON("x" << 9)); splitKeys.done(); autoSplitVectorResponse.append("continuation", false); - auto networkResponseFuture = launchAsync([&]() { - onCommand([&](const executor::RemoteCommandRequest& request) { + + auto remoteResponsesFuture = + setRemoteResponses({[&](const executor::RemoteCommandRequest& request) { return autoSplitVectorResponse.obj(); - }); - }); + }}); + _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); + + ChunkType splitChunk = makeChunk(0, kShardId0); auto futureResponse = _scheduler.requestAutoSplitVector(operationContext(), kNss, splitChunk.getShard(), @@ -263,17 +286,15 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulAutoSplitVectorCommand) { ASSERT_BSONOBJ_EQ(receivedSplitKeys[0], BSON("x" << 7)); ASSERT_BSONOBJ_EQ(receivedSplitKeys[1], BSON("x" << 9)); ASSERT_FALSE(continuation); - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitChunkCommand) { + auto remoteResponsesFuture = setRemoteResponses( + {[&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }}); _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); ChunkType splitChunk = makeChunk(0, kShardId0); - auto networkResponseFuture = launchAsync([&]() { - onCommand( - [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); }); - }); auto futureResponse = _scheduler.requestSplitChunk(operationContext(), kNss, splitChunk.getShard(), @@ -283,21 +304,21 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitChunkCommand) { splitChunk.getMax(), std::vector<BSONObj>{BSON("x" << 5)}); ASSERT_OK(futureResponse.getNoThrow()); - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); _scheduler.stop(); } TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) { - _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); - ChunkType chunk = makeChunk(0, kShardId0); BSONObjBuilder chunkSizeResponse; chunkSizeResponse.append("ok", "1"); chunkSizeResponse.append("size", 156); chunkSizeResponse.append("numObjects", 25); - auto networkResponseFuture = launchAsync([&]() { - onCommand( - [&](const executor::RemoteCommandRequest& request) { return chunkSizeResponse.obj(); }); - }); + auto remoteResponsesFuture = setRemoteResponses( + {[&](const executor::RemoteCommandRequest& request) { return chunkSizeResponse.obj(); }}); + + _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); + ChunkType chunk = makeChunk(0, kShardId0); + auto futureResponse = _scheduler.requestDataSize(operationContext(), kNss, chunk.getShard(), @@ -310,7 +331,7 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) { auto receivedDataSize = swReceivedDataSize.getValue(); ASSERT_EQ(receivedDataSize.sizeBytes, 156); ASSERT_EQ(receivedDataSize.numObjects, 25); - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); _scheduler.stop(); } @@ -319,17 +340,16 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) { globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint"); auto timesEnteredFailPoint = deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0); - + auto timeoutError = Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"}; + auto remoteResponsesFuture = setRemoteResponses( + {[&](const executor::RemoteCommandRequest& request) { return timeoutError; }}); _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); MigrateInfo migrateInfo = makeMigrationInfo(0, kShardId1, kShardId0); - auto timeoutError = Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"}; - auto networkResponseFuture = launchAsync([&]() { - onCommand([&](const executor::RemoteCommandRequest& request) { return timeoutError; }); - }); + auto futureResponse = _scheduler.requestMoveChunk( operationContext(), migrateInfo, getMoveChunkSettings(), false /* issuedByRemoteUser */); ASSERT_EQUALS(futureResponse.getNoThrow(), timeoutError); - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1); // Ensure DistLock is released correctly @@ -366,6 +386,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) { TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfUnsubmittedBeforeBalancerStops) { SemiFuture<void> futureResponse; { + auto remoteResponsesFuture = setRemoteResponses(); FailPointEnableBlock failPoint("pauseSubmissionsFailPoint"); _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); MigrateInfo migrateInfo = makeMigrationInfo(0, kShardId1, kShardId0); @@ -374,6 +395,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfUnsubmittedBeforeBalancer getMoveChunkSettings(), false /* issuedByRemoteUser */); _scheduler.stop(); + remoteResponsesFuture.default_timed_get(); } ASSERT_EQUALS(futureResponse.getNoThrow(), Status(ErrorCodes::BalancerInterrupted, @@ -392,12 +414,11 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfUnsubmittedBeforeBalancer TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenRequestIsSubmitted) { auto opCtx = operationContext(); auto defaultValues = getMigrationRecoveryDefaultValues(); - _scheduler.start(opCtx, getMigrationRecoveryDefaultValues()); MigrateInfo migrateInfo = makeMigrationInfo(0, kShardId1, kShardId0); auto requestSettings = getMoveChunkSettings(kCustomizedMaxChunkSizeBytes); auto const serviceContext = getServiceContext(); - auto networkResponseFuture = launchAsync([&]() { - onCommand([&, serviceContext](const executor::RemoteCommandRequest& request) { + auto remoteResponsesFuture = + setRemoteResponses({[&, serviceContext](const executor::RemoteCommandRequest& request) { ThreadClient tc("Test", getGlobalServiceContext()); auto opCtxHolder = Client::getCurrent()->makeOperationContext(); // As long as the request is not completed, a persisted recovery document should @@ -426,14 +447,13 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq boost::none); ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), recoveredCommand->serialise()); - return BSON("ok" << true); - }); - }); - + return OkReply().toBSON(); + }}); + _scheduler.start(opCtx, getMigrationRecoveryDefaultValues()); auto deferredResponse = _scheduler.requestMoveChunk( operationContext(), migrateInfo, requestSettings, false /* issuedByRemoteUser */); - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); _scheduler.stop(); } @@ -460,20 +480,18 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering // 2. Once started, the persisted document should trigger the remote execution of a request... auto defaultValues = getMigrationRecoveryDefaultValues(); + auto moveChunkRemoteResponseGenerator = [&](const executor::RemoteCommandRequest& request) { + auto expectedCommandInfo = MoveChunkCommandInfo::recoverFrom(recoveryInfo, defaultValues); + // 3. ... Which content should match the recovery doc & configuration. + ASSERT_BSONOBJ_EQ(expectedCommandInfo->serialise(), request.cmdObj); + return OkReply().toBSON(); + }; + auto remoteResponsesFuture = setRemoteResponses({moveChunkRemoteResponseGenerator}); _scheduler.start(opCtx, defaultValues); - auto networkResponseFuture = launchAsync([&]() { - onCommand([&](const executor::RemoteCommandRequest& request) { - auto expectedCommandInfo = - MoveChunkCommandInfo::recoverFrom(recoveryInfo, defaultValues); - // 3. ... Which content should match the recovery doc & configuration. - ASSERT_BSONOBJ_EQ(expectedCommandInfo->serialise(), request.cmdObj); - return BSON("ok" << true); - }); - }); // 4. Once the recovery phase is complete, no persisted documents should remain // (stop() is invoked to ensure that the observed state is stable). - networkResponseFuture.default_timed_get(); + remoteResponsesFuture.default_timed_get(); _scheduler.stop(); auto persistedCommandDocs = getPersistedCommandDocuments(operationContext()); ASSERT_EQUALS(0, persistedCommandDocs.size()); @@ -484,6 +502,7 @@ TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL FailPoint* failpoint = globalFailPointRegistry().find("pauseSubmissionsFailPoint"); failpoint->setMode(FailPoint::Mode::alwaysOn); { + auto remoteResponsesFuture = setRemoteResponses(); _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues()); opCtx = Client::getCurrent()->getOperationContext(); const std::string whyMessage(str::stream() @@ -497,6 +516,7 @@ TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL migrateInfo, getMoveChunkSettings(), false /* issuedByRemoteUser */); + remoteResponsesFuture.default_timed_get(); ASSERT_EQ( futureResponse.getNoThrow(), Status(ErrorCodes::LockBusy, "Failed to acquire dist lock testDb.testColl locally")); diff --git a/src/mongo/db/s/sharding_util.cpp b/src/mongo/db/s/sharding_util.cpp index 002d65d22dc..9bd6c0ae795 100644 --- a/src/mongo/db/s/sharding_util.cpp +++ b/src/mongo/db/s/sharding_util.cpp @@ -64,7 +64,8 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards( StringData dbName, const BSONObj& command, const std::vector<ShardId>& shardIds, - const std::shared_ptr<executor::TaskExecutor>& executor) { + const std::shared_ptr<executor::TaskExecutor>& executor, + const bool throwOnError) { std::vector<AsyncRequestsSender::Request> requests; for (const auto& shardId : shardIds) { requests.emplace_back(shardId, command); @@ -89,17 +90,20 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards( // Retrieve the responses and throw at the first failure. auto response = ars.next(); - const auto errorContext = "Failed command {} for database '{}' on shard '{}'"_format( - command.toString(), dbName, StringData{response.shardId}); + if (throwOnError) { + const auto errorContext = + "Failed command {} for database '{}' on shard '{}'"_format( + command.toString(), dbName, StringData{response.shardId}); - auto shardResponse = - uassertStatusOKWithContext(std::move(response.swResponse), errorContext); + auto shardResponse = + uassertStatusOKWithContext(std::move(response.swResponse), errorContext); - auto status = getStatusFromCommandResult(shardResponse.data); - uassertStatusOKWithContext(status, errorContext); + auto status = getStatusFromCommandResult(shardResponse.data); + uassertStatusOKWithContext(status, errorContext); - auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); - uassertStatusOKWithContext(wcStatus, errorContext); + auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data); + uassertStatusOKWithContext(wcStatus, errorContext); + } responses.push_back(std::move(response)); } diff --git a/src/mongo/db/s/sharding_util.h b/src/mongo/db/s/sharding_util.h index 3484b680748..051b90b595d 100644 --- a/src/mongo/db/s/sharding_util.h +++ b/src/mongo/db/s/sharding_util.h @@ -57,7 +57,8 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards( StringData dbName, const BSONObj& command, const std::vector<ShardId>& shardIds, - const std::shared_ptr<executor::TaskExecutor>& executor); + const std::shared_ptr<executor::TaskExecutor>& executor, + bool throwOnError = true); /** * Unset the `noAutosplit` and `maxChunkSizeBytes` fields from: diff --git a/src/mongo/db/s/shardsvr_join_migrations_command.cpp b/src/mongo/db/s/shardsvr_join_migrations_command.cpp new file mode 100644 index 00000000000..4305b860179 --- /dev/null +++ b/src/mongo/db/s/shardsvr_join_migrations_command.cpp @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/s/active_migrations_registry.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/s/request_types/shardsvr_join_migrations_request_gen.h" + + +namespace mongo { +namespace { +class ShardsvrJoinMigrationsCommand final : public TypedCommand<ShardsvrJoinMigrationsCommand> { +public: + using Request = ShardsvrJoinMigrations; + + bool skipApiVersionCheck() const override { + // Internal command (config -> shard). + return true; + } + + std::string help() const override { + return "Internal command invoked by the config server to join any chunk migration activity " + "executed by the shard"; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } + + bool adminOnly() const override { + return true; + } + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); + opCtx->setAlwaysInterruptAtStepDownOrUp(); + { + Lock::GlobalLock lk(opCtx, MODE_IX); + uassert(ErrorCodes::InterruptedDueToReplStateChange, + "Not primary while trying to join chunk migration", + repl::ReplicationCoordinator::get(opCtx)->getMemberState().primary()); + } + + auto& activeMigrationRegistry = ActiveMigrationsRegistry::get(opCtx); + activeMigrationRegistry.lock(opCtx, kRegistryLockReason); + activeMigrationRegistry.unlock(kRegistryLockReason); + } + + private: + static constexpr char kRegistryLockReason[] = "Running _shardsvrJoinMigrations"; + + NamespaceString ns() const override { + return {request().getDbName(), ""}; + } + + bool supportsWriteConcern() const override { + return false; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; +} _shardsvrJoinMigrationsCmd; + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index ddcd048fe64..336e7a2ff98 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -202,6 +202,7 @@ env.Library( 'request_types/flush_resharding_state_change.idl', 'request_types/flush_routing_table_cache_updates.idl', 'request_types/get_database_version.idl', + 'request_types/shardsvr_join_migrations_request.idl', 'request_types/merge_chunk_request.idl', 'request_types/migration_secondary_throttle_options.cpp', 'request_types/move_chunk_request.cpp', diff --git a/src/mongo/s/request_types/shardsvr_join_migrations_request.idl b/src/mongo/s/request_types/shardsvr_join_migrations_request.idl new file mode 100644 index 00000000000..e779dd5da84 --- /dev/null +++ b/src/mongo/s/request_types/shardsvr_join_migrations_request.idl @@ -0,0 +1,42 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +commands: + _shardsvrJoinMigrations: + command_name: _shardsvrJoinMigrations + cpp_name: ShardsvrJoinMigrations + description: "Command to synch the caller on the completion of any chunk migration activity performed by the shard (as either donor or recipient)" + namespace: ignored + api_version: "" + strict: false |