summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authormathisbessamdb <mathis.bessa@mongodb.com>2022-05-10 22:59:14 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-10 23:26:48 +0000
commite22aedb467254e680b705f04642e8770264af42d (patch)
tree52f43addd3c8c36d0e386b5c6de79e0bda39ad95 /src
parent6a16461988b990f6ea2ebe4e129bad1addada035 (diff)
downloadmongo-e22aedb467254e680b705f04642e8770264af42d.tar.gz
SERVER-64906 Copy applicable Tenant Migration jstests to shard split
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp51
1 files changed, 49 insertions, 2 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 8d70460b54f..cd0636344bd 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -56,10 +56,13 @@ namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(abortShardSplitBeforeLeavingBlockingState);
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlockingState);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterDecision);
MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeRecipientCleanup);
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterMarkingStateGarbageCollectable);
MONGO_FAIL_POINT_DEFINE(skipShardSplitRecipientCleanup);
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
@@ -388,6 +391,9 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
// Note we do not use the abort split token here because the abortShardSplit
// command waits for a decision to be persisted which will not happen if
// inserting the initial state document fails.
+ if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) {
+ pauseShardSplitBeforeBlockingState.pauseWhileSet();
+ }
return _enterBlockingOrAbortedState(executor, primaryToken, abortToken);
})
.then([this, executor, abortToken] {
@@ -396,6 +402,10 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
pauseShardSplitAfterBlocking.pauseWhileSet(opCtx.get());
+ if (MONGO_unlikely(abortShardSplitBeforeLeavingBlockingState.shouldFail())) {
+ uasserted(ErrorCodes::InternalError, "simulate a shard split error");
+ }
+
return _waitForRecipientToReachBlockTimestamp(executor, abortToken);
})
.then([this, executor, abortToken] {
@@ -460,6 +470,35 @@ boost::optional<BSONObj> ShardSplitDonorService::DonorStateMachine::reportForCur
stdx::lock_guard<Latch> lg(_mutex);
BSONObjBuilder bob;
bob.append("desc", "shard split operation");
+ _migrationId.appendToBuilder(&bob, "instanceID"_sd);
+ bob.append("reachedDecision", _decisionPromise.getFuture().isReady());
+ if (_stateDoc.getExpireAt()) {
+ bob.append("expireAt", *_stateDoc.getExpireAt());
+ }
+ const auto& tenantIds = _stateDoc.getTenantIds();
+ if (tenantIds) {
+ bob.append("tenantIds", *tenantIds);
+ }
+ if (_stateDoc.getBlockTimestamp()) {
+ bob.append("blockTimestamp", *_stateDoc.getBlockTimestamp());
+ }
+ if (_stateDoc.getCommitOrAbortOpTime()) {
+ _stateDoc.getCommitOrAbortOpTime()->append(&bob, "commitOrAbortOpTime");
+ }
+ if (_stateDoc.getAbortReason()) {
+ bob.append("abortReason", *_stateDoc.getAbortReason());
+ }
+ if (_stateDoc.getRecipientConnectionString()) {
+ bob.append("recipientConnectionString",
+ _stateDoc.getRecipientConnectionString()->toString());
+ }
+ if (_stateDoc.getRecipientSetName()) {
+ bob.append("recipientSetName", *_stateDoc.getRecipientSetName());
+ }
+ if (_stateDoc.getRecipientTagName()) {
+ bob.append("recipientTagName", *_stateDoc.getRecipientTagName());
+ }
+
return bob.obj();
}
@@ -761,7 +800,10 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForMajority
const ScopedTaskExecutorPtr& executor, repl::OpTime opTime, const CancellationToken& token) {
return WaitForMajorityService::get(_serviceContext)
.waitUntilMajority(std::move(opTime), token)
- .thenRunOn(**executor);
+ .thenRunOn(**executor)
+ .then([this, self = shared_from_this()] {
+ pauseShardSplitAfterMarkingStateGarbageCollectable.pauseWhileSet();
+ });
}
void ShardSplitDonorService::DonorStateMachine::_initiateTimeout(
@@ -771,7 +813,7 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout(
auto timeoutOrCompletionFuture =
whenAny(std::move(timeoutFuture),
- completionFuture().semi().ignoreValue().thenRunOn(**executor))
+ decisionFuture().semi().ignoreValue().thenRunOn(**executor))
.thenRunOn(**executor)
.then([this, executor, abortToken, anchor = shared_from_this()](auto result) {
stdx::lock_guard<Latch> lg(_mutex);
@@ -827,6 +869,11 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
_abortReason = statusWithState.getStatus();
}
+ if (_abortSource) {
+ // Cancel source to ensure all child threads (RSM monitor, etc) terminate.
+ _abortSource->cancel();
+ }
+
BSONObjBuilder bob;
_abortReason->serializeErrorToBSON(&bob);
_stateDoc.setAbortReason(bob.obj());