summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaolo Polato <paolo.polato@mongodb.com>2022-05-29 20:07:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-29 20:43:20 +0000
commit9ae0a24f4d2323424784a941c783c7cf39c3ec20 (patch)
tree5ac67365b48a57876941ab1c0660582227eff09a
parent491f15aaca5d9c4f8a808f6a0a449ad6499467e3 (diff)
downloadmongo-9ae0a24f4d2323424784a941c783c7cf39c3ec20.tar.gz
SERVER-66480 introduce joinMigration shard cmd to set a barrier on the Balancer initialisation
(cherry picked from commit 531dfe764bc0645f5e676feaf5687100c0a5e612)
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js1
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp55
-rw-r--r--src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp136
-rw-r--r--src/mongo/db/s/sharding_util.cpp22
-rw-r--r--src/mongo/db/s/sharding_util.h3
-rw-r--r--src/mongo/db/s/shardsvr_join_migrations_command.cpp104
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/request_types/shardsvr_join_migrations_request.idl42
11 files changed, 293 insertions, 74 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index bbdb2623e75..45f19278511 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -159,6 +159,7 @@ let viewsCommandTests = {
_shardsvrDropDatabase: {skip: isAnInternalCommand},
_shardsvrDropDatabaseParticipant: {skip: isAnInternalCommand},
_shardsvrGetStatsForBalancing: {skip: isAnInternalCommand},
+ _shardsvrJoinMigrations: {skip: isAnInternalCommand},
_shardsvrMovePrimary: {skip: isAnInternalCommand},
_shardsvrMoveRange: {
command: {_shardsvrMoveRange: "test.view"},
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index ec95e98892e..02fc2486ca4 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -87,6 +87,7 @@ const allCommands = {
_shardsvrDropIndexes: {skip: isAnInternalCommand},
_shardsvrCreateCollectionParticipant: {skip: isPrimaryOnly},
_shardsvrGetStatsForBalancing: {skip: isPrimaryOnly},
+ _shardsvrJoinMigrations: {skip: isAnInternalCommand},
_shardsvrMovePrimary: {skip: isPrimaryOnly},
_shardsvrMoveRange: {skip: isPrimaryOnly},
_shardsvrRenameCollection: {skip: isPrimaryOnly},
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index b264ec38ee6..9b8b3a1efda 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -157,6 +157,7 @@ let testCases = {
_shardsvrDropDatabase: {skip: "internal command"},
_shardsvrDropDatabaseParticipant: {skip: "internal command"},
_shardsvrGetStatsForBalancing: {skip: "internal command"},
+ _shardsvrJoinMigrations: {skip: "internal command"},
_shardsvrMovePrimary: {skip: "internal command"},
_shardsvrMoveRange: {
skip:
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index fa1e774f18e..237dad8bcf8 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -419,6 +419,7 @@ env.Library(
'shardsvr_drop_database_participant_command.cpp',
'shardsvr_drop_indexes_command.cpp',
'shardsvr_get_stats_for_balancing_command.cpp',
+ 'shardsvr_join_migrations_command.cpp',
'shardsvr_merge_chunks_command.cpp',
'shardsvr_move_primary_command.cpp',
'shardsvr_move_range_command.cpp',
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
index 797d4c00bbc..6bff2a90f38 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_impl.cpp
@@ -32,10 +32,12 @@
#include "mongo/db/s/balancer/balancer_commands_scheduler_impl.h"
#include "mongo/db/client.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/s/sharding_util.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/grid.h"
#include "mongo/s/request_types/migration_secondary_throttle_options.h"
+#include "mongo/s/request_types/shardsvr_join_migrations_request_gen.h"
#include "mongo/s/shard_id.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/fail_point.h"
@@ -47,6 +49,35 @@ namespace {
MONGO_FAIL_POINT_DEFINE(pauseSubmissionsFailPoint);
MONGO_FAIL_POINT_DEFINE(deferredCleanupCompletedCheckpoint);
+void waitForQuiescedCluster(OperationContext* opCtx) {
+ const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ ShardsvrJoinMigrations joinShardOnMigrationsRequest;
+ joinShardOnMigrationsRequest.setDbName(NamespaceString::kAdminDb);
+
+ auto unquiescedShardIds = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
+
+ const auto responses =
+ sharding_util::sendCommandToShards(opCtx,
+ NamespaceString::kAdminDb.toString(),
+ joinShardOnMigrationsRequest.toBSON({}),
+ unquiescedShardIds,
+ executor,
+ false /*throwOnError*/);
+ for (const auto& r : responses) {
+ auto responseOutcome = r.swResponse.isOK()
+ ? getStatusFromCommandResult(r.swResponse.getValue().data)
+ : r.swResponse.getStatus();
+
+ if (!responseOutcome.isOK()) {
+ LOGV2_WARNING(6648001,
+ "Could not complete _ShardsvrJoinMigrations on shard",
+ "error"_attr = responseOutcome,
+ "shard"_attr = r.shardId);
+ }
+ }
+}
+
+
Status processRemoteResponse(const executor::RemoteCommandResponse& remoteResponse) {
if (!remoteResponse.status.isOK()) {
return remoteResponse.status;
@@ -124,7 +155,7 @@ std::vector<RequestData> rebuildRequestsFromRecoveryInfo(
FindCommandRequest findRequest{MigrationType::ConfigNS};
dbClient.find(std::move(findRequest), ReadPreferenceSetting{}, documentProcessor);
} catch (const DBException& e) {
- LOGV2_ERROR(5847215, "Failed to load requests to recover", "error"_attr = redact(e));
+ LOGV2_ERROR(5847215, "Failed to fetch requests to recover", "error"_attr = redact(e));
}
return rebuiltRequests;
@@ -177,14 +208,26 @@ void BalancerCommandsSchedulerImpl::start(OperationContext* opCtx,
if (!_executor) {
_executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
}
- auto requestsToRecover = rebuildRequestsFromRecoveryInfo(opCtx, defaultValues);
- _numRequestsToRecover = requestsToRecover.size();
- _state = _numRequestsToRecover == 0 ? SchedulerState::Running : SchedulerState::Recovering;
+ _state = SchedulerState::Recovering;
- for (auto& requestToRecover : requestsToRecover) {
- _enqueueRequest(lg, std::move(requestToRecover));
+ try {
+ waitForQuiescedCluster(opCtx);
+ } catch (const DBException& e) {
+ LOGV2_WARNING(
+ 6648002, "Could not join migration activity on shards", "error"_attr = redact(e));
}
+ auto requestsToRecover = rebuildRequestsFromRecoveryInfo(opCtx, defaultValues);
+ _numRequestsToRecover = requestsToRecover.size();
+ if (_numRequestsToRecover == 0) {
+ LOGV2(6648003, "Balancer scheduler recovery complete. Switching to regular execution");
+ _state = SchedulerState::Running;
+ } else {
+ for (auto& requestToRecover : requestsToRecover) {
+ // TODO I'd prefer to simply delete the entries.
+ _enqueueRequest(lg, std::move(requestToRecover));
+ }
+ }
_workerThreadHandle = stdx::thread([this] { _workerThread(); });
}
diff --git a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
index a2f2dc56a29..aab01274138 100644
--- a/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
+++ b/src/mongo/db/s/balancer/balancer_commands_scheduler_test.cpp
@@ -126,6 +126,28 @@ protected:
ConfigServerTestFixture::tearDown();
}
+ /*
+ * Extra setup function to define the whole sequence of (mocked) remote command responses that a
+ * test is expected to receive during its execution.
+ * - Must be invoked before running _scheduler.start()
+ * - The returned future must be stored in a local variable and kept in scope to ensure that the
+ * sequence gets generated
+ */
+ executor::NetworkTestEnv::FutureHandle<void> setRemoteResponses(
+ std::vector<executor::NetworkTestEnv::OnCommandFunction> remoteResponseGenerators = {}) {
+ std::vector<executor::NetworkTestEnv::OnCommandFunction> generatorsWithStartSequence;
+ // Set an OK response for every shardSvrJoinMigration command send out by the start() method
+ // of the commands scheduler
+ for (size_t i = 0; i < kShardList.size(); ++i) {
+ generatorsWithStartSequence.push_back(
+ [&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); });
+ }
+ generatorsWithStartSequence.insert(generatorsWithStartSequence.end(),
+ remoteResponseGenerators.begin(),
+ remoteResponseGenerators.end());
+ return launchAsync([this, g = std::move(generatorsWithStartSequence)] { onCommands(g); });
+ }
+
void configureTargeter(OperationContext* opCtx, ShardId shardId, const HostAndPort& host) {
auto targeter = RemoteCommandTargeterMock::get(
uassertStatusOK(shardRegistry()->getShard(opCtx, shardId))->getTargeter());
@@ -136,8 +158,10 @@ protected:
};
TEST_F(BalancerCommandsSchedulerTest, StartAndStopScheduler) {
+ auto remoteResponsesFuture = setRemoteResponses();
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
_scheduler.stop();
+ remoteResponsesFuture.default_timed_get();
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) {
@@ -145,16 +169,16 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveChunkCommand) {
globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint");
auto timesEnteredFailPoint =
deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0);
+ auto remoteResponsesFuture = setRemoteResponses(
+ {[&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }});
+
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
MigrateInfo migrateInfo = makeMigrationInfo(0, kShardId1, kShardId0);
- auto networkResponseFuture = launchAsync([&]() {
- onCommand(
- [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
- });
+
auto futureResponse = _scheduler.requestMoveChunk(
operationContext(), migrateInfo, getMoveChunkSettings(), false /* issuedByRemoteUser */);
ASSERT_OK(futureResponse.getNoThrow());
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1);
// Ensure DistLock is released correctly
{
@@ -174,6 +198,8 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) {
globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint");
auto timesEnteredFailPoint =
deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0);
+ auto remoteResponsesFuture = setRemoteResponses(
+ {[&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }});
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
ShardsvrMoveRange shardsvrRequest(kNss);
shardsvrRequest.setDbName(NamespaceString::kAdminDb);
@@ -184,14 +210,10 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) {
moveRangeRequestBase.setMin({});
moveRangeRequestBase.setMax({});
- auto networkResponseFuture = launchAsync([&]() {
- onCommand(
- [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
- });
auto futureResponse = _scheduler.requestMoveRange(
operationContext(), shardsvrRequest, WriteConcernOptions(), false /* issuedByRemoteUser */);
ASSERT_OK(futureResponse.getNoThrow());
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1);
// Ensure DistLock is released correctly
{
@@ -207,22 +229,21 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulMoveRangeCommand) {
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulMergeChunkCommand) {
+ auto remoteResponsesFuture = setRemoteResponses(
+ {[&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }});
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
- auto networkResponseFuture = launchAsync([&]() {
- onCommand(
- [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
- });
ChunkRange range(BSON("x" << 0), BSON("x" << 20));
ChunkVersion version(1, 1, OID::gen(), Timestamp(10));
auto futureResponse =
_scheduler.requestMergeChunks(operationContext(), kNss, kShardId0, range, version);
ASSERT_OK(futureResponse.getNoThrow());
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, MergeChunkNonexistentShard) {
+ auto remoteResponsesFuture = setRemoteResponses();
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
ChunkRange range(BSON("x" << 0), BSON("x" << 20));
ChunkVersion version(1, 1, OID::gen(), Timestamp(10));
@@ -230,12 +251,11 @@ TEST_F(BalancerCommandsSchedulerTest, MergeChunkNonexistentShard) {
operationContext(), kNss, ShardId("nonexistent"), range, version);
auto shardNotFoundError = Status{ErrorCodes::ShardNotFound, "Shard nonexistent not found"};
ASSERT_EQ(futureResponse.getNoThrow(), shardNotFoundError);
+ remoteResponsesFuture.default_timed_get();
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulAutoSplitVectorCommand) {
- _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
- ChunkType splitChunk = makeChunk(0, kShardId0);
BSONObjBuilder autoSplitVectorResponse;
autoSplitVectorResponse.append("ok", "1");
BSONArrayBuilder splitKeys(autoSplitVectorResponse.subarrayStart("splitKeys"));
@@ -243,11 +263,14 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulAutoSplitVectorCommand) {
splitKeys.append(BSON("x" << 9));
splitKeys.done();
autoSplitVectorResponse.append("continuation", false);
- auto networkResponseFuture = launchAsync([&]() {
- onCommand([&](const executor::RemoteCommandRequest& request) {
+
+ auto remoteResponsesFuture =
+ setRemoteResponses({[&](const executor::RemoteCommandRequest& request) {
return autoSplitVectorResponse.obj();
- });
- });
+ }});
+ _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
+
+ ChunkType splitChunk = makeChunk(0, kShardId0);
auto futureResponse = _scheduler.requestAutoSplitVector(operationContext(),
kNss,
splitChunk.getShard(),
@@ -263,17 +286,15 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulAutoSplitVectorCommand) {
ASSERT_BSONOBJ_EQ(receivedSplitKeys[0], BSON("x" << 7));
ASSERT_BSONOBJ_EQ(receivedSplitKeys[1], BSON("x" << 9));
ASSERT_FALSE(continuation);
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitChunkCommand) {
+ auto remoteResponsesFuture = setRemoteResponses(
+ {[&](const executor::RemoteCommandRequest& request) { return OkReply().toBSON(); }});
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
ChunkType splitChunk = makeChunk(0, kShardId0);
- auto networkResponseFuture = launchAsync([&]() {
- onCommand(
- [&](const executor::RemoteCommandRequest& request) { return BSON("ok" << true); });
- });
auto futureResponse = _scheduler.requestSplitChunk(operationContext(),
kNss,
splitChunk.getShard(),
@@ -283,21 +304,21 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulSplitChunkCommand) {
splitChunk.getMax(),
std::vector<BSONObj>{BSON("x" << 5)});
ASSERT_OK(futureResponse.getNoThrow());
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
_scheduler.stop();
}
TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) {
- _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
- ChunkType chunk = makeChunk(0, kShardId0);
BSONObjBuilder chunkSizeResponse;
chunkSizeResponse.append("ok", "1");
chunkSizeResponse.append("size", 156);
chunkSizeResponse.append("numObjects", 25);
- auto networkResponseFuture = launchAsync([&]() {
- onCommand(
- [&](const executor::RemoteCommandRequest& request) { return chunkSizeResponse.obj(); });
- });
+ auto remoteResponsesFuture = setRemoteResponses(
+ {[&](const executor::RemoteCommandRequest& request) { return chunkSizeResponse.obj(); }});
+
+ _scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
+ ChunkType chunk = makeChunk(0, kShardId0);
+
auto futureResponse = _scheduler.requestDataSize(operationContext(),
kNss,
chunk.getShard(),
@@ -310,7 +331,7 @@ TEST_F(BalancerCommandsSchedulerTest, SuccessfulRequestChunkDataSizeCommand) {
auto receivedDataSize = swReceivedDataSize.getValue();
ASSERT_EQ(receivedDataSize.sizeBytes, 156);
ASSERT_EQ(receivedDataSize.numObjects, 25);
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
_scheduler.stop();
}
@@ -319,17 +340,16 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenNetworkReturnsError) {
globalFailPointRegistry().find("deferredCleanupCompletedCheckpoint");
auto timesEnteredFailPoint =
deferredCleanupCompletedCheckpoint->setMode(FailPoint::alwaysOn, 0);
-
+ auto timeoutError = Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"};
+ auto remoteResponsesFuture = setRemoteResponses(
+ {[&](const executor::RemoteCommandRequest& request) { return timeoutError; }});
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
MigrateInfo migrateInfo = makeMigrationInfo(0, kShardId1, kShardId0);
- auto timeoutError = Status{ErrorCodes::NetworkTimeout, "Mock error: network timed out"};
- auto networkResponseFuture = launchAsync([&]() {
- onCommand([&](const executor::RemoteCommandRequest& request) { return timeoutError; });
- });
+
auto futureResponse = _scheduler.requestMoveChunk(
operationContext(), migrateInfo, getMoveChunkSettings(), false /* issuedByRemoteUser */);
ASSERT_EQUALS(futureResponse.getNoThrow(), timeoutError);
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
deferredCleanupCompletedCheckpoint->waitForTimesEntered(timesEnteredFailPoint + 1);
// Ensure DistLock is released correctly
@@ -366,6 +386,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandFailsWhenSchedulerIsStopped) {
TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfUnsubmittedBeforeBalancerStops) {
SemiFuture<void> futureResponse;
{
+ auto remoteResponsesFuture = setRemoteResponses();
FailPointEnableBlock failPoint("pauseSubmissionsFailPoint");
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
MigrateInfo migrateInfo = makeMigrationInfo(0, kShardId1, kShardId0);
@@ -374,6 +395,7 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfUnsubmittedBeforeBalancer
getMoveChunkSettings(),
false /* issuedByRemoteUser */);
_scheduler.stop();
+ remoteResponsesFuture.default_timed_get();
}
ASSERT_EQUALS(futureResponse.getNoThrow(),
Status(ErrorCodes::BalancerInterrupted,
@@ -392,12 +414,11 @@ TEST_F(BalancerCommandsSchedulerTest, CommandCanceledIfUnsubmittedBeforeBalancer
TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenRequestIsSubmitted) {
auto opCtx = operationContext();
auto defaultValues = getMigrationRecoveryDefaultValues();
- _scheduler.start(opCtx, getMigrationRecoveryDefaultValues());
MigrateInfo migrateInfo = makeMigrationInfo(0, kShardId1, kShardId0);
auto requestSettings = getMoveChunkSettings(kCustomizedMaxChunkSizeBytes);
auto const serviceContext = getServiceContext();
- auto networkResponseFuture = launchAsync([&]() {
- onCommand([&, serviceContext](const executor::RemoteCommandRequest& request) {
+ auto remoteResponsesFuture =
+ setRemoteResponses({[&, serviceContext](const executor::RemoteCommandRequest& request) {
ThreadClient tc("Test", getGlobalServiceContext());
auto opCtxHolder = Client::getCurrent()->makeOperationContext();
// As long as the request is not completed, a persisted recovery document should
@@ -426,14 +447,13 @@ TEST_F(BalancerCommandsSchedulerTest, MoveChunkCommandGetsPersistedOnDiskWhenReq
boost::none);
ASSERT_BSONOBJ_EQ(originalCommandInfo.serialise(), recoveredCommand->serialise());
- return BSON("ok" << true);
- });
- });
-
+ return OkReply().toBSON();
+ }});
+ _scheduler.start(opCtx, getMigrationRecoveryDefaultValues());
auto deferredResponse = _scheduler.requestMoveChunk(
operationContext(), migrateInfo, requestSettings, false /* issuedByRemoteUser */);
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
_scheduler.stop();
}
@@ -460,20 +480,18 @@ TEST_F(BalancerCommandsSchedulerTest, PersistedCommandsAreReissuedWhenRecovering
// 2. Once started, the persisted document should trigger the remote execution of a request...
auto defaultValues = getMigrationRecoveryDefaultValues();
+ auto moveChunkRemoteResponseGenerator = [&](const executor::RemoteCommandRequest& request) {
+ auto expectedCommandInfo = MoveChunkCommandInfo::recoverFrom(recoveryInfo, defaultValues);
+ // 3. ... Which content should match the recovery doc & configuration.
+ ASSERT_BSONOBJ_EQ(expectedCommandInfo->serialise(), request.cmdObj);
+ return OkReply().toBSON();
+ };
+ auto remoteResponsesFuture = setRemoteResponses({moveChunkRemoteResponseGenerator});
_scheduler.start(opCtx, defaultValues);
- auto networkResponseFuture = launchAsync([&]() {
- onCommand([&](const executor::RemoteCommandRequest& request) {
- auto expectedCommandInfo =
- MoveChunkCommandInfo::recoverFrom(recoveryInfo, defaultValues);
- // 3. ... Which content should match the recovery doc & configuration.
- ASSERT_BSONOBJ_EQ(expectedCommandInfo->serialise(), request.cmdObj);
- return BSON("ok" << true);
- });
- });
// 4. Once the recovery phase is complete, no persisted documents should remain
// (stop() is invoked to ensure that the observed state is stable).
- networkResponseFuture.default_timed_get();
+ remoteResponsesFuture.default_timed_get();
_scheduler.stop();
auto persistedCommandDocs = getPersistedCommandDocuments(operationContext());
ASSERT_EQUALS(0, persistedCommandDocs.size());
@@ -484,6 +502,7 @@ TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL
FailPoint* failpoint = globalFailPointRegistry().find("pauseSubmissionsFailPoint");
failpoint->setMode(FailPoint::Mode::alwaysOn);
{
+ auto remoteResponsesFuture = setRemoteResponses();
_scheduler.start(operationContext(), getMigrationRecoveryDefaultValues());
opCtx = Client::getCurrent()->getOperationContext();
const std::string whyMessage(str::stream()
@@ -497,6 +516,7 @@ TEST_F(BalancerCommandsSchedulerTest, DistLockPreventsMoveChunkWithConcurrentDDL
migrateInfo,
getMoveChunkSettings(),
false /* issuedByRemoteUser */);
+ remoteResponsesFuture.default_timed_get();
ASSERT_EQ(
futureResponse.getNoThrow(),
Status(ErrorCodes::LockBusy, "Failed to acquire dist lock testDb.testColl locally"));
diff --git a/src/mongo/db/s/sharding_util.cpp b/src/mongo/db/s/sharding_util.cpp
index 002d65d22dc..9bd6c0ae795 100644
--- a/src/mongo/db/s/sharding_util.cpp
+++ b/src/mongo/db/s/sharding_util.cpp
@@ -64,7 +64,8 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards(
StringData dbName,
const BSONObj& command,
const std::vector<ShardId>& shardIds,
- const std::shared_ptr<executor::TaskExecutor>& executor) {
+ const std::shared_ptr<executor::TaskExecutor>& executor,
+ const bool throwOnError) {
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& shardId : shardIds) {
requests.emplace_back(shardId, command);
@@ -89,17 +90,20 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards(
// Retrieve the responses and throw at the first failure.
auto response = ars.next();
- const auto errorContext = "Failed command {} for database '{}' on shard '{}'"_format(
- command.toString(), dbName, StringData{response.shardId});
+ if (throwOnError) {
+ const auto errorContext =
+ "Failed command {} for database '{}' on shard '{}'"_format(
+ command.toString(), dbName, StringData{response.shardId});
- auto shardResponse =
- uassertStatusOKWithContext(std::move(response.swResponse), errorContext);
+ auto shardResponse =
+ uassertStatusOKWithContext(std::move(response.swResponse), errorContext);
- auto status = getStatusFromCommandResult(shardResponse.data);
- uassertStatusOKWithContext(status, errorContext);
+ auto status = getStatusFromCommandResult(shardResponse.data);
+ uassertStatusOKWithContext(status, errorContext);
- auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
- uassertStatusOKWithContext(wcStatus, errorContext);
+ auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
+ uassertStatusOKWithContext(wcStatus, errorContext);
+ }
responses.push_back(std::move(response));
}
diff --git a/src/mongo/db/s/sharding_util.h b/src/mongo/db/s/sharding_util.h
index 3484b680748..051b90b595d 100644
--- a/src/mongo/db/s/sharding_util.h
+++ b/src/mongo/db/s/sharding_util.h
@@ -57,7 +57,8 @@ std::vector<AsyncRequestsSender::Response> sendCommandToShards(
StringData dbName,
const BSONObj& command,
const std::vector<ShardId>& shardIds,
- const std::shared_ptr<executor::TaskExecutor>& executor);
+ const std::shared_ptr<executor::TaskExecutor>& executor,
+ bool throwOnError = true);
/**
* Unset the `noAutosplit` and `maxChunkSizeBytes` fields from:
diff --git a/src/mongo/db/s/shardsvr_join_migrations_command.cpp b/src/mongo/db/s/shardsvr_join_migrations_command.cpp
new file mode 100644
index 00000000000..4305b860179
--- /dev/null
+++ b/src/mongo/db/s/shardsvr_join_migrations_command.cpp
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/s/active_migrations_registry.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/s/request_types/shardsvr_join_migrations_request_gen.h"
+
+
+namespace mongo {
+namespace {
+class ShardsvrJoinMigrationsCommand final : public TypedCommand<ShardsvrJoinMigrationsCommand> {
+public:
+ using Request = ShardsvrJoinMigrations;
+
+ bool skipApiVersionCheck() const override {
+ // Internal command (config -> shard).
+ return true;
+ }
+
+ std::string help() const override {
+ return "Internal command invoked by the config server to join any chunk migration activity "
+ "executed by the shard";
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
+ opCtx->setAlwaysInterruptAtStepDownOrUp();
+ {
+ Lock::GlobalLock lk(opCtx, MODE_IX);
+ uassert(ErrorCodes::InterruptedDueToReplStateChange,
+ "Not primary while trying to join chunk migration",
+ repl::ReplicationCoordinator::get(opCtx)->getMemberState().primary());
+ }
+
+ auto& activeMigrationRegistry = ActiveMigrationsRegistry::get(opCtx);
+ activeMigrationRegistry.lock(opCtx, kRegistryLockReason);
+ activeMigrationRegistry.unlock(kRegistryLockReason);
+ }
+
+ private:
+ static constexpr char kRegistryLockReason[] = "Running _shardsvrJoinMigrations";
+
+ NamespaceString ns() const override {
+ return {request().getDbName(), ""};
+ }
+
+ bool supportsWriteConcern() const override {
+ return false;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+} _shardsvrJoinMigrationsCmd;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index ddcd048fe64..336e7a2ff98 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -202,6 +202,7 @@ env.Library(
'request_types/flush_resharding_state_change.idl',
'request_types/flush_routing_table_cache_updates.idl',
'request_types/get_database_version.idl',
+ 'request_types/shardsvr_join_migrations_request.idl',
'request_types/merge_chunk_request.idl',
'request_types/migration_secondary_throttle_options.cpp',
'request_types/move_chunk_request.cpp',
diff --git a/src/mongo/s/request_types/shardsvr_join_migrations_request.idl b/src/mongo/s/request_types/shardsvr_join_migrations_request.idl
new file mode 100644
index 00000000000..e779dd5da84
--- /dev/null
+++ b/src/mongo/s/request_types/shardsvr_join_migrations_request.idl
@@ -0,0 +1,42 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+commands:
+ _shardsvrJoinMigrations:
+ command_name: _shardsvrJoinMigrations
+ cpp_name: ShardsvrJoinMigrations
+ description: "Command to synch the caller on the completion of any chunk migration activity performed by the shard (as either donor or recipient)"
+ namespace: ignored
+ api_version: ""
+ strict: false