diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2022-05-04 13:30:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-04 15:43:35 +0000 |
commit | 9532f19af93fdb0cb7feea7bda248a6ac1ae1e6c (patch) | |
tree | fb2cb7f88a780243383ba459fd20993cc370b920 /src/mongo/db/serverless | |
parent | dee78d953c905a26e84741a5d40f8a463c594759 (diff) | |
download | mongo-9532f19af93fdb0cb7feea7bda248a6ac1ae1e6c.tar.gz |
SERVER-66004 Improve shard split state transition logging
Diffstat (limited to 'src/mongo/db/serverless')
3 files changed, 92 insertions, 98 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 724bc604fdb..4ea4779f462 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -147,7 +147,7 @@ namespace detail { SemiFuture<void> makeRecipientAcceptSplitFuture( std::shared_ptr<executor::TaskExecutor> taskExecutor, - const CancellationToken& token, + const CancellationToken& abortToken, const ConnectionString& recipientConnectionString) { // build a vector of single server discovery monitors to listen for heartbeats @@ -177,11 +177,7 @@ SemiFuture<void> makeRecipientAcceptSplitFuture( monitors.back()->init(); } - LOGV2(6142508, - "Monitoring recipient nodes for split acceptance.", - "recipientConnectionString"_attr = recipientConnectionString); - - return future_util::withCancellation(listener->getFuture(), token) + return future_util::withCancellation(listener->getFuture(), abortToken) .thenRunOn(taskExecutor) // Preserve lifetime of listener and monitor until the future is fulfilled and remove the // listener. @@ -287,7 +283,7 @@ ShardSplitDonorService::DonorStateMachine::DonorStateMachine( }())) {} void ShardSplitDonorService::DonorStateMachine::tryAbort() { - LOGV2(6086502, "Aborting shard split", "id"_attr = _migrationId); + LOGV2(6086502, "Received 'abortShardSplit' command.", "id"_attr = _migrationId); stdx::lock_guard<Latch> lg(_mutex); _abortRequested = true; if (_abortSource) { @@ -296,13 +292,12 @@ void ShardSplitDonorService::DonorStateMachine::tryAbort() { } void ShardSplitDonorService::DonorStateMachine::tryForget() { - LOGV2(6236601, "Forgetting shard split", "id"_attr = _migrationId); - + LOGV2(6236601, "Received 'forgetShardSplit' command.", "id"_attr = _migrationId); stdx::lock_guard<Latch> lg(_mutex); if (_forgetShardSplitReceivedPromise.getFuture().isReady()) { - LOGV2(6236602, "Donor Forget Migration promise is already ready", "id"_attr = _migrationId); return; } + _forgetShardSplitReceivedPromise.emplaceValue(); } @@ -339,8 +334,8 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor); const bool shouldRemoveStateDocumentOnRecipient = [&]() { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); stdx::lock_guard<Latch> lg(_mutex); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); return serverless::shouldRemoveStateDocumentOnRecipient(opCtx.get(), _stateDoc); }(); @@ -349,9 +344,6 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( pauseShardSplitBeforeRecipientCleanup.pauseWhileSet(); } - LOGV2(6309000, - "Cancelling and cleaning up shard split operation on recipient in blocking state.", - "id"_attr = _migrationId); _decisionPromise.setWith([&] { return ExecutorFuture(**executor) .then([this, executor, primaryToken, anchor = shared_from_this()] { @@ -362,9 +354,6 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( return _cleanRecipientStateDoc(executor, primaryToken); }) .then([this, executor, migrationId = _migrationId]() { - LOGV2(6236607, - "Cleanup stale shard split operation on recipient.", - "migrationId"_attr = migrationId); return DurableState{ShardSplitDonorStateEnum::kCommitted}; }) .unsafeToInlineFuture(); @@ -378,17 +367,18 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( } auto isConfigValidWithStatus = [&]() { + stdx::lock_guard<Latch> lg(_mutex); auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); invariant(replCoord); - stdx::lock_guard<Latch> lg(_mutex); return serverless::validateRecipientNodesForShardSplit(_stateDoc, replCoord->getConfig()); }(); if (!isConfigValidWithStatus.isOK()) { LOGV2_ERROR(6395900, - "Failed to validate recipient nodes for shard split", + "Failed to validate recipient nodes for shard split.", "id"_attr = _migrationId, - "error"_attr = isConfigValidWithStatus.reason()); - _decisionPromise.emplaceValue(DurableState{ShardSplitDonorStateEnum::kCommitted}); + "status"_attr = isConfigValidWithStatus); + _decisionPromise.emplaceValue( + DurableState{ShardSplitDonorStateEnum::kAborted, isConfigValidWithStatus}); _completionPromise.setFrom( _decisionPromise.getFuture().semi().ignoreValue().unsafeToInlineFuture()); return _completionPromise.getFuture().semi(); @@ -403,11 +393,11 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( _decisionPromise.setWith([&] { return ExecutorFuture(**executor) - .then([this, executor, primaryToken] { + .then([this, executor, primaryToken, abortToken] { // Note we do not use the abort split token here because the abortShardSplit // command waits for a decision to be persisted which will not happen if // inserting the initial state document fails. - return _enterBlockingOrAbortedState(executor, primaryToken); + return _enterBlockingOrAbortedState(executor, primaryToken, abortToken); }) .then([this, executor, abortToken] { checkForTokenInterrupt(abortToken); @@ -427,11 +417,6 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( return _waitForRecipientToAcceptSplit(executor, abortToken); }) .then([this, executor, abortToken] { - LOGV2(6086503, - "Shard split completed", - "id"_attr = _migrationId, - "abortReason"_attr = _abortReason); - stdx::lock_guard<Latch> lg(_mutex); return DurableState{_stateDoc.getState(), _abortReason}; }) @@ -452,10 +437,27 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( .semi() .ignoreValue() .thenRunOn(**executor) - .then([this, anchor = shared_from_this(), executor, primaryToken] { + .then([this, executor, primaryToken] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - pauseShardSplitAfterDecision.pauseWhileSetAndNotCanceled(opCtx.get(), primaryToken); - return _waitForForgetCmdThenMarkGarbageCollectible(executor, primaryToken); + if (MONGO_unlikely(pauseShardSplitAfterDecision.shouldFail())) { + pauseShardSplitAfterDecision.pauseWhileSetAndNotCanceled(opCtx.get(), + primaryToken); + } + + return _waitForForgetCmdThenMarkGarbageCollectable(executor, primaryToken); + }) + .onCompletion([this, anchor = shared_from_this(), primaryToken](Status status) { + stdx::lock_guard<Latch> lg(_mutex); + LOGV2(8423357, + "Marked shard split as garbage collectable.", + "id"_attr = _stateDoc.getId(), + "expireAt"_attr = _stateDoc.getExpireAt()); + + LOGV2(8423356, + "Shard split completed.", + "id"_attr = _stateDoc.getId(), + "status"_attr = status, + "abortReason"_attr = _abortReason); }) .unsafeToInlineFuture()); @@ -508,8 +510,8 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestam } ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfigToDonor( - const ScopedTaskExecutorPtr& executor, const CancellationToken& token) { - checkForTokenInterrupt(token); + const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { + checkForTokenInterrupt(abortToken); { stdx::lock_guard<Latch> lg(_mutex); @@ -523,9 +525,9 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfi invariant(replCoord); LOGV2(6309100, - "Generating and applying a split config", + "Applying the split config.", "id"_attr = _migrationId, - "conf"_attr = replCoord->getConfig()); + "config"_attr = replCoord->getConfig()); return AsyncTry([this] { auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc()); @@ -551,50 +553,45 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfi client.runCommand(NamespaceString::kAdminDb.toString(), BSON("replSetReconfig" << newConfig.toBSON()), result); - uassert( - ErrorCodes::BadValue, "Invalid return value for replSetReconfig", returnValue); + uassert(ErrorCodes::BadValue, + "Invalid return value for 'replSetReconfig' command.", + returnValue); uassertStatusOK(getStatusFromCommandResult(result)); }) .until([](Status status) { return status.isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, token) - .then([this] { - LOGV2(6309101, - "Split config has been generated and committed.", - "id"_attr = _migrationId); - }); + .on(**executor, abortToken); } ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplit( - const ScopedTaskExecutorPtr& executor, const CancellationToken& token) { - checkForTokenInterrupt(token); + const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { + checkForTokenInterrupt(abortToken); { stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) { return ExecutorFuture(**executor); } - - LOGV2(6142501, "Waiting for recipient to accept the split.", "id"_attr = _migrationId); } + LOGV2(6142501, "Waiting for recipient to accept the split.", "id"_attr = _migrationId); + return ExecutorFuture(**executor) .then([&]() { return _splitAcceptancePromise.getFuture(); }) - .then([this, executor, token] { - LOGV2(6142503, - "Recipient has accepted the split, committing decision.", - "id"_attr = _migrationId); - - return _updateStateDocument(executor, token, ShardSplitDonorStateEnum::kCommitted) - .then([this, executor, token](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), token); + .then([this, executor, abortToken] { + LOGV2(6142503, "Entering 'committed' state.", "id"_attr = _stateDoc.getId()); + + return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kCommitted) + .then([this, executor, abortToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken); }); }); } ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState( - const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryServiceToken) { - + const ScopedTaskExecutorPtr& executor, + const CancellationToken& primaryToken, + const CancellationToken& abortToken) { ShardSplitDonorStateEnum nextState; { stdx::lock_guard<Latch> lg(_mutex); @@ -606,13 +603,15 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr } _abortReason = - Status(ErrorCodes::TenantMigrationAborted, "Aborted due to abortShardSplit."); + Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'."); BSONObjBuilder bob; _abortReason->serializeErrorToBSON(&bob); _stateDoc.setAbortReason(bob.obj()); _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); nextState = ShardSplitDonorStateEnum::kAborted; + + LOGV2(8423355, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId()); } else { auto recipientTagName = _stateDoc.getRecipientTagName(); invariant(recipientTagName); @@ -634,8 +633,13 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr ? *_splitAcceptanceTaskExecutorForTest : _shardSplitService->getInstanceCleanupExecutor(); + LOGV2(6142508, + "Monitoring recipient nodes for split acceptance.", + "id"_attr = _migrationId, + "recipientConnectionString"_attr = recipientConnectionString); + return detail::makeRecipientAcceptSplitFuture( - executor, primaryServiceToken, recipientConnectionString) + executor, abortToken, recipientConnectionString) .unsafeToInlineFuture(); }); @@ -649,14 +653,11 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr _stateDoc.setRecipientConnectionString(recipientConnectionString); _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking); nextState = ShardSplitDonorStateEnum::kBlocking; + + LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId()); } } - LOGV2(6086504, - "Inserting initial state document.", - "id"_attr = _migrationId, - "state"_attr = nextState); - return AsyncTry([this, nextState, uuid = _migrationId]() { auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto opCtx = opCtxHolder.get(); @@ -705,13 +706,13 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr return shouldStopInsertingDonorStateDoc(swOpTime.getStatus()); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, primaryServiceToken) - .then([this, executor, primaryServiceToken](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryServiceToken); + .on(**executor, primaryToken) + .then([this, executor, primaryToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken); }) .then([this, executor, nextState]() { uassert(ErrorCodes::TenantMigrationAborted, - "Shard split operation aborted", + "Shard split operation aborted.", nextState != ShardSplitDonorStateEnum::kAborted); }); } @@ -778,7 +779,6 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForMajority void ShardSplitDonorService::DonorStateMachine::_initiateTimeout( const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { - auto timeoutFuture = (*executor)->sleepFor(Milliseconds(repl::shardSplitTimeoutMS.load()), abortToken); @@ -831,7 +831,7 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( // operation failed. In either case we abort the migration. if (abortToken.isCanceled()) { statusWithState = - Status(ErrorCodes::TenantMigrationAborted, "Aborted due to abortShardSplit."); + Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit' command."); } { @@ -865,52 +865,45 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( } ExecutorFuture<void> -ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectible( - const ScopedTaskExecutorPtr& executor, const CancellationToken& token) { - LOGV2(6236603, - "Waiting to receive 'forgetShardSplit' command.", - "migrationId"_attr = _migrationId); +ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectable( + const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) { + LOGV2(6236603, "Waiting to receive 'forgetShardSplit' command.", "id"_attr = _migrationId); auto expiredAt = [&]() { stdx::lock_guard<Latch> lg(_mutex); return _stateDoc.getExpireAt(); }(); if (expiredAt) { - LOGV2(6236604, "expiredAt is already set", "migrationId"_attr = _migrationId); return ExecutorFuture(**executor); } - return future_util::withCancellation(_forgetShardSplitReceivedPromise.getFuture(), token) + return future_util::withCancellation(_forgetShardSplitReceivedPromise.getFuture(), primaryToken) .thenRunOn(**executor) - .then([this, self = shared_from_this(), executor, token] { - LOGV2(6236606, - "Marking shard split as garbage-collectable.", - "migrationId"_attr = _migrationId); + .then([this, self = shared_from_this(), executor, primaryToken] { + LOGV2(6236606, "Marking shard split as garbage-collectable.", "id"_attr = _migrationId); stdx::lock_guard<Latch> lg(_mutex); _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); return AsyncTry([this, self = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc)); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + uassertStatusOK(serverless::updateStateDoc(opCtx.get(), _stateDoc)); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) .until( [](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, token); + .on(**executor, primaryToken); }) - .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), token); + .then([this, self = shared_from_this(), executor, primaryToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken); }); } ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_cleanRecipientStateDoc( - const ScopedTaskExecutorPtr& executor, const CancellationToken& token) { - + const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) { + LOGV2(6309000, "Cleaning up shard split operation on recipient.", "id"_attr = _migrationId); return AsyncTry([this, self = shared_from_this()] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto deleted = @@ -923,7 +916,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_cleanRecipientS }) .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, token) + .on(**executor, primaryToken) .ignoreValue(); } } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index 787d09372be..190a57b3cd3 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -155,19 +155,20 @@ public: private: // Tasks ExecutorFuture<void> _enterBlockingOrAbortedState(const ScopedTaskExecutorPtr& executor, - const CancellationToken& token); + const CancellationToken& primaryToken, + const CancellationToken& abortToken); ExecutorFuture<void> _waitForRecipientToReachBlockTimestamp( - const ScopedTaskExecutorPtr& executor, const CancellationToken& token); + const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); ExecutorFuture<void> _applySplitConfigToDonor(const ScopedTaskExecutorPtr& executor, - const CancellationToken& token); + const CancellationToken& abortToken); ExecutorFuture<void> _waitForRecipientToAcceptSplit(const ScopedTaskExecutorPtr& executor, - const CancellationToken& token); + const CancellationToken& abortToken); - ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectible( - const ScopedTaskExecutorPtr& executor, const CancellationToken& token); + ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectable( + const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken); ExecutorFuture<DurableState> _handleErrorOrEnterAbortedState( StatusWith<DurableState> durableState, diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 329409cc6c4..aad81bbf5dc 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -579,7 +579,7 @@ TEST_F(ShardSplitDonorServiceTest, AbortDueToRecipientNodesValidation) { auto result = decisionFuture.get(); - ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted); + ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted); ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); ASSERT_FALSE(serviceInstance->isGarbageCollectable()); |