summaryrefslogtreecommitdiff
path: root/src/mongo/db/serverless
diff options
context:
space:
mode:
authorMatt Broadstone <mbroadst@mongodb.com>2022-05-04 13:30:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-04 15:43:35 +0000
commit9532f19af93fdb0cb7feea7bda248a6ac1ae1e6c (patch)
treefb2cb7f88a780243383ba459fd20993cc370b920 /src/mongo/db/serverless
parentdee78d953c905a26e84741a5d40f8a463c594759 (diff)
downloadmongo-9532f19af93fdb0cb7feea7bda248a6ac1ae1e6c.tar.gz
SERVER-66004 Improve shard split state transition logging
Diffstat (limited to 'src/mongo/db/serverless')
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp175
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.h13
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp2
3 files changed, 92 insertions, 98 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 724bc604fdb..4ea4779f462 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -147,7 +147,7 @@ namespace detail {
SemiFuture<void> makeRecipientAcceptSplitFuture(
std::shared_ptr<executor::TaskExecutor> taskExecutor,
- const CancellationToken& token,
+ const CancellationToken& abortToken,
const ConnectionString& recipientConnectionString) {
// build a vector of single server discovery monitors to listen for heartbeats
@@ -177,11 +177,7 @@ SemiFuture<void> makeRecipientAcceptSplitFuture(
monitors.back()->init();
}
- LOGV2(6142508,
- "Monitoring recipient nodes for split acceptance.",
- "recipientConnectionString"_attr = recipientConnectionString);
-
- return future_util::withCancellation(listener->getFuture(), token)
+ return future_util::withCancellation(listener->getFuture(), abortToken)
.thenRunOn(taskExecutor)
// Preserve lifetime of listener and monitor until the future is fulfilled and remove the
// listener.
@@ -287,7 +283,7 @@ ShardSplitDonorService::DonorStateMachine::DonorStateMachine(
}())) {}
void ShardSplitDonorService::DonorStateMachine::tryAbort() {
- LOGV2(6086502, "Aborting shard split", "id"_attr = _migrationId);
+ LOGV2(6086502, "Received 'abortShardSplit' command.", "id"_attr = _migrationId);
stdx::lock_guard<Latch> lg(_mutex);
_abortRequested = true;
if (_abortSource) {
@@ -296,13 +292,12 @@ void ShardSplitDonorService::DonorStateMachine::tryAbort() {
}
void ShardSplitDonorService::DonorStateMachine::tryForget() {
- LOGV2(6236601, "Forgetting shard split", "id"_attr = _migrationId);
-
+ LOGV2(6236601, "Received 'forgetShardSplit' command.", "id"_attr = _migrationId);
stdx::lock_guard<Latch> lg(_mutex);
if (_forgetShardSplitReceivedPromise.getFuture().isReady()) {
- LOGV2(6236602, "Donor Forget Migration promise is already ready", "id"_attr = _migrationId);
return;
}
+
_forgetShardSplitReceivedPromise.emplaceValue();
}
@@ -339,8 +334,8 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
_cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor);
const bool shouldRemoveStateDocumentOnRecipient = [&]() {
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
stdx::lock_guard<Latch> lg(_mutex);
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
return serverless::shouldRemoveStateDocumentOnRecipient(opCtx.get(), _stateDoc);
}();
@@ -349,9 +344,6 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
pauseShardSplitBeforeRecipientCleanup.pauseWhileSet();
}
- LOGV2(6309000,
- "Cancelling and cleaning up shard split operation on recipient in blocking state.",
- "id"_attr = _migrationId);
_decisionPromise.setWith([&] {
return ExecutorFuture(**executor)
.then([this, executor, primaryToken, anchor = shared_from_this()] {
@@ -362,9 +354,6 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
return _cleanRecipientStateDoc(executor, primaryToken);
})
.then([this, executor, migrationId = _migrationId]() {
- LOGV2(6236607,
- "Cleanup stale shard split operation on recipient.",
- "migrationId"_attr = migrationId);
return DurableState{ShardSplitDonorStateEnum::kCommitted};
})
.unsafeToInlineFuture();
@@ -378,17 +367,18 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
}
auto isConfigValidWithStatus = [&]() {
+ stdx::lock_guard<Latch> lg(_mutex);
auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext());
invariant(replCoord);
- stdx::lock_guard<Latch> lg(_mutex);
return serverless::validateRecipientNodesForShardSplit(_stateDoc, replCoord->getConfig());
}();
if (!isConfigValidWithStatus.isOK()) {
LOGV2_ERROR(6395900,
- "Failed to validate recipient nodes for shard split",
+ "Failed to validate recipient nodes for shard split.",
"id"_attr = _migrationId,
- "error"_attr = isConfigValidWithStatus.reason());
- _decisionPromise.emplaceValue(DurableState{ShardSplitDonorStateEnum::kCommitted});
+ "status"_attr = isConfigValidWithStatus);
+ _decisionPromise.emplaceValue(
+ DurableState{ShardSplitDonorStateEnum::kAborted, isConfigValidWithStatus});
_completionPromise.setFrom(
_decisionPromise.getFuture().semi().ignoreValue().unsafeToInlineFuture());
return _completionPromise.getFuture().semi();
@@ -403,11 +393,11 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
_decisionPromise.setWith([&] {
return ExecutorFuture(**executor)
- .then([this, executor, primaryToken] {
+ .then([this, executor, primaryToken, abortToken] {
// Note we do not use the abort split token here because the abortShardSplit
// command waits for a decision to be persisted which will not happen if
// inserting the initial state document fails.
- return _enterBlockingOrAbortedState(executor, primaryToken);
+ return _enterBlockingOrAbortedState(executor, primaryToken, abortToken);
})
.then([this, executor, abortToken] {
checkForTokenInterrupt(abortToken);
@@ -427,11 +417,6 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
return _waitForRecipientToAcceptSplit(executor, abortToken);
})
.then([this, executor, abortToken] {
- LOGV2(6086503,
- "Shard split completed",
- "id"_attr = _migrationId,
- "abortReason"_attr = _abortReason);
-
stdx::lock_guard<Latch> lg(_mutex);
return DurableState{_stateDoc.getState(), _abortReason};
})
@@ -452,10 +437,27 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
.semi()
.ignoreValue()
.thenRunOn(**executor)
- .then([this, anchor = shared_from_this(), executor, primaryToken] {
+ .then([this, executor, primaryToken] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- pauseShardSplitAfterDecision.pauseWhileSetAndNotCanceled(opCtx.get(), primaryToken);
- return _waitForForgetCmdThenMarkGarbageCollectible(executor, primaryToken);
+ if (MONGO_unlikely(pauseShardSplitAfterDecision.shouldFail())) {
+ pauseShardSplitAfterDecision.pauseWhileSetAndNotCanceled(opCtx.get(),
+ primaryToken);
+ }
+
+ return _waitForForgetCmdThenMarkGarbageCollectable(executor, primaryToken);
+ })
+ .onCompletion([this, anchor = shared_from_this(), primaryToken](Status status) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2(8423357,
+ "Marked shard split as garbage collectable.",
+ "id"_attr = _stateDoc.getId(),
+ "expireAt"_attr = _stateDoc.getExpireAt());
+
+ LOGV2(8423356,
+ "Shard split completed.",
+ "id"_attr = _stateDoc.getId(),
+ "status"_attr = status,
+ "abortReason"_attr = _abortReason);
})
.unsafeToInlineFuture());
@@ -508,8 +510,8 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestam
}
ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfigToDonor(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& token) {
- checkForTokenInterrupt(token);
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
+ checkForTokenInterrupt(abortToken);
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -523,9 +525,9 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfi
invariant(replCoord);
LOGV2(6309100,
- "Generating and applying a split config",
+ "Applying the split config.",
"id"_attr = _migrationId,
- "conf"_attr = replCoord->getConfig());
+ "config"_attr = replCoord->getConfig());
return AsyncTry([this] {
auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc());
@@ -551,50 +553,45 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfi
client.runCommand(NamespaceString::kAdminDb.toString(),
BSON("replSetReconfig" << newConfig.toBSON()),
result);
- uassert(
- ErrorCodes::BadValue, "Invalid return value for replSetReconfig", returnValue);
+ uassert(ErrorCodes::BadValue,
+ "Invalid return value for 'replSetReconfig' command.",
+ returnValue);
uassertStatusOK(getStatusFromCommandResult(result));
})
.until([](Status status) { return status.isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, token)
- .then([this] {
- LOGV2(6309101,
- "Split config has been generated and committed.",
- "id"_attr = _migrationId);
- });
+ .on(**executor, abortToken);
}
ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplit(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& token) {
- checkForTokenInterrupt(token);
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
+ checkForTokenInterrupt(abortToken);
{
stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) {
return ExecutorFuture(**executor);
}
-
- LOGV2(6142501, "Waiting for recipient to accept the split.", "id"_attr = _migrationId);
}
+ LOGV2(6142501, "Waiting for recipient to accept the split.", "id"_attr = _migrationId);
+
return ExecutorFuture(**executor)
.then([&]() { return _splitAcceptancePromise.getFuture(); })
- .then([this, executor, token] {
- LOGV2(6142503,
- "Recipient has accepted the split, committing decision.",
- "id"_attr = _migrationId);
-
- return _updateStateDocument(executor, token, ShardSplitDonorStateEnum::kCommitted)
- .then([this, executor, token](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
+ .then([this, executor, abortToken] {
+ LOGV2(6142503, "Entering 'committed' state.", "id"_attr = _stateDoc.getId());
+
+ return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kCommitted)
+ .then([this, executor, abortToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken);
});
});
}
ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryServiceToken) {
-
+ const ScopedTaskExecutorPtr& executor,
+ const CancellationToken& primaryToken,
+ const CancellationToken& abortToken) {
ShardSplitDonorStateEnum nextState;
{
stdx::lock_guard<Latch> lg(_mutex);
@@ -606,13 +603,15 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr
}
_abortReason =
- Status(ErrorCodes::TenantMigrationAborted, "Aborted due to abortShardSplit.");
+ Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'.");
BSONObjBuilder bob;
_abortReason->serializeErrorToBSON(&bob);
_stateDoc.setAbortReason(bob.obj());
_stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
nextState = ShardSplitDonorStateEnum::kAborted;
+
+ LOGV2(8423355, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId());
} else {
auto recipientTagName = _stateDoc.getRecipientTagName();
invariant(recipientTagName);
@@ -634,8 +633,13 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr
? *_splitAcceptanceTaskExecutorForTest
: _shardSplitService->getInstanceCleanupExecutor();
+ LOGV2(6142508,
+ "Monitoring recipient nodes for split acceptance.",
+ "id"_attr = _migrationId,
+ "recipientConnectionString"_attr = recipientConnectionString);
+
return detail::makeRecipientAcceptSplitFuture(
- executor, primaryServiceToken, recipientConnectionString)
+ executor, abortToken, recipientConnectionString)
.unsafeToInlineFuture();
});
@@ -649,14 +653,11 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr
_stateDoc.setRecipientConnectionString(recipientConnectionString);
_stateDoc.setState(ShardSplitDonorStateEnum::kBlocking);
nextState = ShardSplitDonorStateEnum::kBlocking;
+
+ LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId());
}
}
- LOGV2(6086504,
- "Inserting initial state document.",
- "id"_attr = _migrationId,
- "state"_attr = nextState);
-
return AsyncTry([this, nextState, uuid = _migrationId]() {
auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto opCtx = opCtxHolder.get();
@@ -705,13 +706,13 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOr
return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
})
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, primaryServiceToken)
- .then([this, executor, primaryServiceToken](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryServiceToken);
+ .on(**executor, primaryToken)
+ .then([this, executor, primaryToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken);
})
.then([this, executor, nextState]() {
uassert(ErrorCodes::TenantMigrationAborted,
- "Shard split operation aborted",
+ "Shard split operation aborted.",
nextState != ShardSplitDonorStateEnum::kAborted);
});
}
@@ -778,7 +779,6 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForMajority
void ShardSplitDonorService::DonorStateMachine::_initiateTimeout(
const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
-
auto timeoutFuture =
(*executor)->sleepFor(Milliseconds(repl::shardSplitTimeoutMS.load()), abortToken);
@@ -831,7 +831,7 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
// operation failed. In either case we abort the migration.
if (abortToken.isCanceled()) {
statusWithState =
- Status(ErrorCodes::TenantMigrationAborted, "Aborted due to abortShardSplit.");
+ Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit' command.");
}
{
@@ -865,52 +865,45 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
}
ExecutorFuture<void>
-ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectible(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& token) {
- LOGV2(6236603,
- "Waiting to receive 'forgetShardSplit' command.",
- "migrationId"_attr = _migrationId);
+ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectable(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) {
+ LOGV2(6236603, "Waiting to receive 'forgetShardSplit' command.", "id"_attr = _migrationId);
auto expiredAt = [&]() {
stdx::lock_guard<Latch> lg(_mutex);
return _stateDoc.getExpireAt();
}();
if (expiredAt) {
- LOGV2(6236604, "expiredAt is already set", "migrationId"_attr = _migrationId);
return ExecutorFuture(**executor);
}
- return future_util::withCancellation(_forgetShardSplitReceivedPromise.getFuture(), token)
+ return future_util::withCancellation(_forgetShardSplitReceivedPromise.getFuture(), primaryToken)
.thenRunOn(**executor)
- .then([this, self = shared_from_this(), executor, token] {
- LOGV2(6236606,
- "Marking shard split as garbage-collectable.",
- "migrationId"_attr = _migrationId);
+ .then([this, self = shared_from_this(), executor, primaryToken] {
+ LOGV2(6236606, "Marking shard split as garbage-collectable.", "id"_attr = _migrationId);
stdx::lock_guard<Latch> lg(_mutex);
_stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
return AsyncTry([this, self = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
-
- uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc));
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ uassertStatusOK(serverless::updateStateDoc(opCtx.get(), _stateDoc));
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
.until(
[](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, token);
+ .on(**executor, primaryToken);
})
- .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
+ .then([this, self = shared_from_this(), executor, primaryToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken);
});
}
ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_cleanRecipientStateDoc(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& token) {
-
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) {
+ LOGV2(6309000, "Cleaning up shard split operation on recipient.", "id"_attr = _migrationId);
return AsyncTry([this, self = shared_from_this()] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto deleted =
@@ -923,7 +916,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_cleanRecipientS
})
.until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, token)
+ .on(**executor, primaryToken)
.ignoreValue();
}
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h
index 787d09372be..190a57b3cd3 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.h
+++ b/src/mongo/db/serverless/shard_split_donor_service.h
@@ -155,19 +155,20 @@ public:
private:
// Tasks
ExecutorFuture<void> _enterBlockingOrAbortedState(const ScopedTaskExecutorPtr& executor,
- const CancellationToken& token);
+ const CancellationToken& primaryToken,
+ const CancellationToken& abortToken);
ExecutorFuture<void> _waitForRecipientToReachBlockTimestamp(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& token);
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken);
ExecutorFuture<void> _applySplitConfigToDonor(const ScopedTaskExecutorPtr& executor,
- const CancellationToken& token);
+ const CancellationToken& abortToken);
ExecutorFuture<void> _waitForRecipientToAcceptSplit(const ScopedTaskExecutorPtr& executor,
- const CancellationToken& token);
+ const CancellationToken& abortToken);
- ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectible(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& token);
+ ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectable(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken);
ExecutorFuture<DurableState> _handleErrorOrEnterAbortedState(
StatusWith<DurableState> durableState,
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 329409cc6c4..aad81bbf5dc 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -579,7 +579,7 @@ TEST_F(ShardSplitDonorServiceTest, AbortDueToRecipientNodesValidation) {
auto result = decisionFuture.get();
- ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted);
+ ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted);
ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
ASSERT_FALSE(serviceInstance->isGarbageCollectable());