summaryrefslogtreecommitdiff
path: root/src/mongo/db/serverless
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/serverless')
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp109
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp94
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp172
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.h10
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp97
5 files changed, 254 insertions, 228 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index a7e15963bf7..20c4dcfb7d8 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -38,6 +38,10 @@
namespace mongo {
namespace {
+bool isSecondary(const OperationContext* opCtx) {
+ return !opCtx->writesAreReplicated();
+}
+
const auto tenantIdsToDeleteDecoration =
OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>();
@@ -123,70 +127,49 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
*/
void onBlockerInitialization(OperationContext* opCtx,
const ShardSplitDonorDocument& donorStateDoc) {
- invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized);
+ invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking);
+ invariant(donorStateDoc.getBlockTimestamp());
auto optionalTenants = donorStateDoc.getTenantIds();
invariant(optionalTenants);
- auto recipientTagName = donorStateDoc.getRecipientTagName();
- auto recipientSetName = donorStateDoc.getRecipientSetName();
- invariant(recipientTagName);
- invariant(recipientSetName);
-
- auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
- auto recipientConnectionString =
- serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName);
-
- for (const auto& tenantId : optionalTenants.get()) {
- auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
- opCtx->getServiceContext(),
- donorStateDoc.getId(),
- tenantId.toString(),
- MigrationProtocolEnum::kMultitenantMigrations,
- recipientConnectionString.toString());
-
- TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab);
-
- if (opCtx->writesAreReplicated()) {
- // onRollback is not registered on secondaries since secondaries should not fail to
- // apply the write.
- opCtx->recoveryUnit()->onRollback([opCtx, donorStateDoc, tenant = tenantId.toString()] {
- TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .remove(tenant, TenantMigrationAccessBlocker::BlockerType::kDonor);
- });
- }
- }
-}
+ const auto& tenantIds = optionalTenants.get();
-/**
- * Transitions the TenantMigrationDonorAccessBlocker to the blocking state.
- */
-void onTransitionToBlocking(OperationContext* opCtx, const ShardSplitDonorDocument& donorStateDoc) {
- invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking);
- invariant(donorStateDoc.getBlockTimestamp());
- invariant(donorStateDoc.getTenantIds());
+ // The primary create and sets the tenant access blocker to blocking within the
+ // ShardSplitDonorService.
+ if (isSecondary(opCtx)) {
+ auto recipientTagName = donorStateDoc.getRecipientTagName();
+ auto recipientSetName = donorStateDoc.getRecipientSetName();
+ invariant(recipientTagName);
+ invariant(recipientSetName);
- if (donorStateDoc.getTenantIds()) {
- auto tenantIds = donorStateDoc.getTenantIds().get();
- for (auto tenantId : tenantIds) {
- auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
- opCtx->getServiceContext(), tenantId);
- invariant(mtab);
-
- if (!opCtx->writesAreReplicated()) {
- // A primary calls startBlockingWrites on the TenantMigrationDonorAccessBlocker
- // before reserving the OpTime for the "start blocking" write, so only secondaries
- // call startBlockingWrites on the TenantMigrationDonorAccessBlocker in the op
- // observer.
- mtab->startBlockingWrites();
- }
+ auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
+ auto recipientConnectionString =
+ serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName);
+
+ for (const auto& tenantId : tenantIds) {
+ auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
+ opCtx->getServiceContext(),
+ donorStateDoc.getId(),
+ tenantId.toString(),
+ MigrationProtocolEnum::kMultitenantMigrations,
+ recipientConnectionString.toString());
- // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since
- // startBlockingReadsAfter just needs to be called before the "start blocking" write's
- // oplog hole is filled.
- mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get());
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .add(tenantId, mtab);
+
+ // No rollback handler is necessary as the write should not fail on secondaries.
+ mtab->startBlockingWrites();
}
}
+
+ for (const auto& tenantId : tenantIds) {
+ auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
+ opCtx->getServiceContext(), tenantId);
+ invariant(mtab);
+
+ mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get());
+ }
}
/**
@@ -325,17 +308,17 @@ void ShardSplitDonorOpObserver::onInserts(OperationContext* opCtx,
for (auto it = first; it != last; it++) {
auto donorStateDoc = parseAndValidateDonorDocument(it->doc);
switch (donorStateDoc.getState()) {
- case ShardSplitDonorStateEnum::kUninitialized:
+ case ShardSplitDonorStateEnum::kBlocking:
onBlockerInitialization(opCtx, donorStateDoc);
break;
case ShardSplitDonorStateEnum::kAborted:
// If the operation starts aborted, do not do anything.
break;
- case ShardSplitDonorStateEnum::kBlocking:
+ case ShardSplitDonorStateEnum::kUninitialized:
case ShardSplitDonorStateEnum::kCommitted:
uasserted(ErrorCodes::IllegalOperation,
- "cannot insert a donor's state doc with 'state' other than 'aborting "
- "index builds'");
+ "cannot insert a donor's state doc with 'state' other than 'kAborted' or "
+ "'kBlocking'");
break;
default:
MONGO_UNREACHABLE;
@@ -352,14 +335,16 @@ void ShardSplitDonorOpObserver::onUpdate(OperationContext* opCtx,
auto donorStateDoc = parseAndValidateDonorDocument(args.updateArgs->updatedDoc);
switch (donorStateDoc.getState()) {
- case ShardSplitDonorStateEnum::kBlocking:
- onTransitionToBlocking(opCtx, donorStateDoc);
- break;
case ShardSplitDonorStateEnum::kCommitted:
case ShardSplitDonorStateEnum::kAborted:
opCtx->recoveryUnit()->registerChange(
std::make_unique<TenantMigrationDonorCommitOrAbortHandler>(opCtx, donorStateDoc));
break;
+ case ShardSplitDonorStateEnum::kBlocking:
+ uasserted(ErrorCodes::IllegalOperation,
+ "The state document should be inserted as blocking and never transition to "
+ "blocking");
+ break;
default:
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
index d040690e0cf..c52868126e6 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
@@ -49,13 +49,6 @@ void startBlockingReadsAfter(
}
}
-void startBlockingWrites(
- std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>>& blockers) {
- for (auto& blocker : blockers) {
- blocker->startBlockingWrites();
- }
-}
-
class ShardSplitDonorOpObserverTest : public ServiceContextMongoDTest {
public:
void setUp() override {
@@ -110,14 +103,7 @@ protected:
void runUpdateTestCase(
ShardSplitDonorDocument stateDocument,
const std::vector<std::string>& tenants,
- std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> blockers,
std::function<void(std::shared_ptr<TenantMigrationAccessBlocker>)> mtabVerifier) {
- ASSERT_EQ(tenants.size(), blockers.size());
-
- for (size_t i = 0; i < blockers.size(); ++i) {
- TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext())
- .add(tenants[i], std::move(blockers[i]));
- }
// If there's an exception, aborting without removing the access blocker will trigger an
// invariant. This creates a confusing error log in the test output.
@@ -140,13 +126,13 @@ protected:
scopedTenants.dismiss();
}
- std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> createBlockers(
- const std::vector<std::string>& tenants,
- OperationContext* opCtx,
- const std::string& connectionStr) {
+ std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>>
+ createBlockersAndStartBlockingWrites(const std::vector<std::string>& tenants,
+ OperationContext* opCtx,
+ const std::string& connectionStr) {
auto uuid = UUID::gen();
std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> blockers;
- for (auto& tenant : tenants) {
+ for (const auto& tenant : tenants) {
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
_opCtx->getServiceContext(),
uuid,
@@ -154,7 +140,9 @@ protected:
MigrationProtocolEnum::kMultitenantMigrations,
_connectionStr);
- blockers.push_back(std::move(mtab));
+ blockers.push_back(mtab);
+ mtab->startBlockingWrites();
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenant, mtab);
}
return blockers;
@@ -227,6 +215,7 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertWrongType) {
TEST_F(ShardSplitDonorOpObserverTest, InitialInsertInvalidState) {
std::vector<ShardSplitDonorStateEnum> states = {ShardSplitDonorStateEnum::kAborted,
ShardSplitDonorStateEnum::kBlocking,
+ ShardSplitDonorStateEnum::kUninitialized,
ShardSplitDonorStateEnum::kCommitted};
for (auto state : states) {
@@ -264,14 +253,23 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertValidAbortedDocument) {
}
}
-TEST_F(ShardSplitDonorOpObserverTest, InsertDocument) {
+TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentPrimary) {
test::shard_split::reconfigToAddRecipientNodes(
getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts());
+ createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get(), _connectionStr);
+
auto stateDocument = defaultStateDocument();
+ stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
+ stateDocument.setBlockTimestamp(Timestamp(1, 1));
+
auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
ASSERT_TRUE(mtab);
- ASSERT_OK(mtab->checkIfCanWrite(Timestamp(1)).code());
+ // The OpObserver does not set the mtab to blocking for primaries.
+ ASSERT_EQ(mtab->checkIfCanWrite(Timestamp(1, 1)).code(),
+ ErrorCodes::TenantMigrationConflict);
+ ASSERT_EQ(mtab->checkIfCanWrite(Timestamp(1, 3)).code(),
+ ErrorCodes::TenantMigrationConflict);
ASSERT_OK(mtab->checkIfLinearizableReadWasAllowed(opCtx));
ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict);
};
@@ -279,16 +277,17 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertDocument) {
runInsertTestCase(stateDocument, _tenantIds, mtabVerifier);
}
-TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingPrimary) {
+TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentSecondary) {
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts());
+
+ // This indicates the instance is secondary for the OpObserver.
+ repl::UnreplicatedWritesBlock setSecondary(_opCtx.get());
+
auto stateDocument = defaultStateDocument();
stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
stateDocument.setBlockTimestamp(Timestamp(1, 1));
- auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr);
- for (auto& blocker : blockers) {
- blocker->startBlockingWrites();
- }
-
auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
ASSERT_TRUE(mtab);
// The OpObserver does not set the mtab to blocking for primaries.
@@ -300,10 +299,11 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingPrimary) {
ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict);
};
- runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier);
+ runInsertTestCase(stateDocument, _tenantIds, mtabVerifier);
}
-TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingSecondary) {
+
+TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingFail) {
// This indicates the instance is secondary for the OpObserver.
repl::UnreplicatedWritesBlock setSecondary(_opCtx.get());
@@ -311,19 +311,23 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingSecondary) {
stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
stateDocument.setBlockTimestamp(Timestamp(1, 1));
- auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr);
- auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
- ASSERT_TRUE(mtab);
- ASSERT_EQ(mtab->checkIfCanWrite(Timestamp(1, 1)).code(),
- ErrorCodes::TenantMigrationConflict);
- ASSERT_EQ(mtab->checkIfCanWrite(Timestamp(1, 3)).code(),
- ErrorCodes::TenantMigrationConflict);
- ASSERT_OK(mtab->checkIfLinearizableReadWasAllowed(opCtx));
- ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict);
+ CollectionUpdateArgs updateArgs;
+ updateArgs.stmtIds = {};
+ updateArgs.updatedDoc = stateDocument.toBSON();
+ updateArgs.update =
+ BSON("$set" << BSON(ShardSplitDonorDocument::kStateFieldName
+ << ShardSplitDonorState_serializer(stateDocument.getState())));
+ updateArgs.criteria = BSON("_id" << stateDocument.getId());
+ OplogUpdateEntryArgs update(&updateArgs, _nss, stateDocument.getId());
+
+ auto update_lambda = [&]() {
+ WriteUnitOfWork wuow(_opCtx.get());
+ _observer->onUpdate(_opCtx.get(), update);
+ wuow.commit();
};
- runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier);
+ ASSERT_THROWS_CODE(update_lambda(), DBException, ErrorCodes::IllegalOperation);
}
TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) {
@@ -336,8 +340,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) {
stateDocument.setBlockTimestamp(Timestamp(1, 2));
stateDocument.setCommitOrAbortOpTime(commitOpTime);
- auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr);
- startBlockingWrites(blockers);
+ auto blockers = createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get(), _connectionStr);
startBlockingReadsAfter(blockers, Timestamp(1));
auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
@@ -351,7 +354,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) {
ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationCommitted);
};
- runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier);
+ runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier);
}
TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) {
@@ -369,8 +372,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) {
stateDocument.setCommitOrAbortOpTime(commitOpTime);
stateDocument.setAbortReason(bob.obj());
- auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr);
- startBlockingWrites(blockers);
+ auto blockers = createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get(), _connectionStr);
startBlockingReadsAfter(blockers, Timestamp(1));
auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
@@ -383,7 +385,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) {
ASSERT_OK(mtab->checkIfCanBuildIndex().code());
};
- runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier);
+ runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier);
}
} // namespace
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index db394bda552..71cdd0879ed 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -55,6 +55,10 @@ namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking);
+MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance);
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeRecipientCleanup);
+
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
bool shouldStopInsertingDonorStateDoc(Status status) {
@@ -106,10 +110,39 @@ void checkForTokenInterrupt(const CancellationToken& token) {
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
}
-MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlocking);
-MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking);
-MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance);
-MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeRecipientCleanup);
+void insertTenantAccessBlocker(WithLock lk,
+ OperationContext* opCtx,
+ ShardSplitDonorDocument donorStateDoc) {
+ auto optionalTenants = donorStateDoc.getTenantIds();
+ invariant(optionalTenants);
+
+ auto recipientTagName = donorStateDoc.getRecipientTagName();
+ auto recipientSetName = donorStateDoc.getRecipientSetName();
+ invariant(recipientTagName);
+ invariant(recipientSetName);
+
+ auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
+ auto recipientConnectionString =
+ serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName);
+
+ for (const auto& tenantId : optionalTenants.get()) {
+ auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
+ opCtx->getServiceContext(),
+ donorStateDoc.getId(),
+ tenantId.toString(),
+ MigrationProtocolEnum::kMultitenantMigrations,
+ recipientConnectionString.toString());
+
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab);
+
+ // If the wuow fails, we need to remove the access blockers we just added. This ensures we
+ // leave things in a valid state and are able to retry the operation.
+ opCtx->recoveryUnit()->onRollback([opCtx, donorStateDoc, tenant = tenantId.toString()] {
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .remove(tenant, TenantMigrationAccessBlocker::BlockerType::kDonor);
+ });
+ }
+}
const std::string kTTLIndexName = "ShardSplitDonorTTLIndex";
@@ -324,6 +357,8 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
"id"_attr = _migrationId,
"timeout"_attr = repl::shardSplitTimeoutMS.load());
+ _createReplicaSetMonitor(abortToken);
+
_decisionPromise.setWith([&] {
return ExecutorFuture(**executor)
.then([this, executor, primaryToken] {
@@ -332,21 +367,15 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
// inserting the initial state document fails.
return _writeInitialDocument(executor, primaryToken);
})
- .then([this] { pauseShardSplitBeforeBlocking.pauseWhileSet(); })
.then([this, executor, abortToken] {
checkForTokenInterrupt(abortToken);
_cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
- _createReplicaSetMonitor(abortToken);
- return _enterBlockingState(executor, abortToken);
- })
- .then([this, abortToken] {
if (MONGO_unlikely(pauseShardSplitAfterBlocking.shouldFail())) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
pauseShardSplitAfterBlocking.pauseWhileSetAndNotCanceled(opCtx.get(),
abortToken);
}
- })
- .then([this, executor, abortToken] {
+
return _waitForRecipientToReachBlockTimestamp(executor, abortToken);
})
.then([this, executor, abortToken] {
@@ -401,24 +430,6 @@ boost::optional<BSONObj> ShardSplitDonorService::DonorStateMachine::reportForCur
return bob.obj();
}
-ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingState(
- const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
- checkForTokenInterrupt(abortToken);
-
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getState() >= ShardSplitDonorStateEnum::kBlocking) {
- return ExecutorFuture(**executor);
- }
- }
-
- LOGV2(6177200, "Entering blocking state.", "id"_attr = _migrationId);
- return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kBlocking)
- .then([this, executor, abortToken](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken);
- });
-}
-
ExecutorFuture<void>
ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestamp(
const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
@@ -540,37 +551,40 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien
ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDocument(
const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryServiceToken) {
- const auto initialState = [&]() {
+ ShardSplitDonorStateEnum nextState;
+ {
stdx::lock_guard<Latch> lg(_mutex);
- return _stateDoc.getState();
- }();
- if (initialState == ShardSplitDonorStateEnum::kAborted) {
- stdx::lock_guard<Latch> lg(_mutex);
- if (isAbortedDocumentPersistent(lg, _stateDoc)) {
- // Node has step up and created an instance using a document in abort state. No need
- // to write the document as it already exists.
+ if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
+ if (isAbortedDocumentPersistent(lg, _stateDoc)) {
+ // Node has step up and created an instance using a document in abort state. No need
+ // to write the document as it already exists.
+ return ExecutorFuture(**executor);
+ }
+
+ _abortReason =
+ Status(ErrorCodes::TenantMigrationAborted, "Aborted due to abortShardSplit.");
+ BSONObjBuilder bob;
+ _abortReason->serializeErrorToBSON(&bob);
+ _stateDoc.setAbortReason(bob.obj());
+ nextState = ShardSplitDonorStateEnum::kAborted;
+
+ } else if (_stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized) {
+ _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking);
+ nextState = ShardSplitDonorStateEnum::kBlocking;
+ } else {
+ // Node has step up and resumed a shard split. No need to write the document as it
+ // already exists.
return ExecutorFuture(**executor);
}
-
- _abortReason =
- Status(ErrorCodes::TenantMigrationAborted, "Aborted due to abortShardSplit.");
- BSONObjBuilder bob;
- _abortReason->serializeErrorToBSON(&bob);
- _stateDoc.setAbortReason(bob.obj());
-
- } else if (initialState != ShardSplitDonorStateEnum::kUninitialized) {
- // Node has step up and resumed a shard split. No need to write the document as it
- // already exists.
- return ExecutorFuture(**executor);
}
LOGV2(6086504,
"Inserting initial state document.",
"id"_attr = _migrationId,
- "state"_attr = initialState);
+ "state"_attr = nextState);
- return AsyncTry([this, nextState = initialState, uuid = _migrationId]() {
+ return AsyncTry([this, nextState, uuid = _migrationId]() {
auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto opCtx = opCtxHolder.get();
@@ -584,33 +598,32 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDoc
return _stateDoc.toBSON();
};
- if (nextState == ShardSplitDonorStateEnum::kAborted) {
- WriteUnitOfWork wuow(opCtx);
-
- // Reserve an opTime for the write.
- auto oplogSlot =
- LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
- setStateDocTimestamps(
- stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc);
- auto updateResult = Helpers::upsert(opCtx,
- _stateDocumentsNS.ns(),
- filter,
- getUpdatedStateDocBson(),
- /*fromMigrate=*/false);
-
- // We only want to insert, not modify, document
- invariant(updateResult.numDocsModified == 0);
- wuow.commit();
- } else {
- auto updateResult = Helpers::upsert(opCtx,
- _stateDocumentsNS.ns(),
- filter,
- getUpdatedStateDocBson(),
- /*fromMigrate=*/false);
-
- // We only want to insert, not modify, document
- invariant(updateResult.numDocsModified == 0);
+ WriteUnitOfWork wuow(opCtx);
+ if (nextState == ShardSplitDonorStateEnum::kBlocking) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ insertTenantAccessBlocker(lg, opCtx, _stateDoc);
+
+ auto tenantIds = _stateDoc.getTenantIds();
+ invariant(tenantIds);
+ setMtabToBlockingForTenants(_serviceContext, opCtx, tenantIds.get());
}
+
+ // Reserve an opTime for the write.
+ auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
+ setStateDocTimestamps(
+ stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc);
+
+ auto updateResult = Helpers::upsert(opCtx,
+ _stateDocumentsNS.ns(),
+ filter,
+ getUpdatedStateDocBson(),
+ /*fromMigrate=*/false);
+
+
+ // We only want to insert, not modify, document
+ invariant(updateResult.numMatched == 0);
+ wuow.commit();
});
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
@@ -623,7 +636,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDoc
.then([this, executor, primaryServiceToken](repl::OpTime opTime) {
return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryServiceToken);
})
- .then([this, executor, nextState = initialState]() {
+ .then([this, executor, nextState]() {
uassert(ErrorCodes::TenantMigrationAborted,
"Shard split operation aborted",
nextState != ShardSplitDonorStateEnum::kAborted);
@@ -654,10 +667,6 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] {
WriteUnitOfWork wuow(opCtx);
- if (nextState == ShardSplitDonorStateEnum::kBlocking) {
- invariant(tenantIds);
- setMtabToBlockingForTenants(_serviceContext, opCtx, tenantIds.get());
- }
// Reserve an opTime for the write.
auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
setStateDocTimestamps(
@@ -749,7 +758,6 @@ void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor(
}();
_recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture());
- _replicaSetMonitorCreatedPromise.emplaceValue();
}
ExecutorFuture<ShardSplitDonorService::DonorStateMachine::DurableState>
diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h
index d001bba0de1..6af6d79ad49 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.h
+++ b/src/mongo/db/serverless/shard_split_donor_service.h
@@ -131,10 +131,6 @@ public:
return _completionPromise.getFuture();
}
- SharedSemiFuture<void> replicaSetMonitorCreatedFuture() const {
- return _replicaSetMonitorCreatedPromise.getFuture();
- }
-
UUID getId() const {
return _migrationId;
}
@@ -156,9 +152,6 @@ public:
private:
// Tasks
- ExecutorFuture<void> _enterBlockingState(const ScopedTaskExecutorPtr& executor,
- const CancellationToken& token);
-
ExecutorFuture<void> _waitForRecipientToReachBlockTimestamp(
const ScopedTaskExecutorPtr& executor, const CancellationToken& token);
@@ -223,9 +216,6 @@ private:
boost::optional<CancellationSource> _abortSource;
boost::optional<Status> _abortReason;
- // A promise fulfilled when the replicaSetMonitor has been created;
- SharedPromise<void> _replicaSetMonitorCreatedPromise;
-
// A promise fulfilled when the shard split has committed or aborted.
SharedPromise<DurableState> _decisionPromise;
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index ddcfdba81aa..9f9acd832c9 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -45,6 +45,8 @@
#include "mongo/db/repl/primary_only_service_test_fixture.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/tenant_migration_access_blocker_registry.h"
+#include "mongo/db/repl/tenant_migration_donor_access_blocker.h"
#include "mongo/db/repl/wait_for_majority_service.h"
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_donor_service.h"
@@ -228,11 +230,10 @@ protected:
StreamableReplicaSetMonitorForTesting _rsmMonitor;
std::string _recipientTagName{"$recipientNode"};
std::string _recipientSetName{_replSet.getURI().getSetName()};
+ FailPointEnableBlock _skipAcceptanceFP{"skipShardSplitWaitForSplitAcceptance"};
};
TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) {
- FailPointEnableBlock fp("skipShardSplitWaitForSplitAcceptance");
-
auto opCtx = makeOperationContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
test::shard_split::reconfigToAddRecipientNodes(
@@ -245,44 +246,28 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
ASSERT_EQ(_uuid, serviceInstance->getId());
auto decisionFuture = serviceInstance->decisionFuture();
-
- std::shared_ptr<TopologyDescription> topologyDescriptionOld =
- std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration());
- std::shared_ptr<TopologyDescription> topologyDescriptionNew =
- makeRecipientTopologyDescription(_replSet);
-
- // Wait until the RSM has been created by the instance.
- auto replicaSetMonitorCreatedFuture = serviceInstance->replicaSetMonitorCreatedFuture();
- replicaSetMonitorCreatedFuture.wait(opCtx.get());
-
- // Retrieve monitor installed by _rsmMonitor.setup(...)
- auto monitor = std::dynamic_pointer_cast<StreamableReplicaSetMonitor>(
- ReplicaSetMonitor::createIfNeeded(_replSet.getURI()));
- invariant(monitor);
- auto publisher = monitor->getEventsPublisher();
-
- publisher->onTopologyDescriptionChangedEvent(topologyDescriptionOld, topologyDescriptionNew);
-
decisionFuture.wait();
+ auto result = decisionFuture.get();
+ ASSERT(!result.abortReason);
+ ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted);
+
BSONObj splitConfigBson = mockReplSetReconfigCmd.getLatestConfig();
ASSERT_TRUE(splitConfigBson.hasField("replSetReconfig"));
auto splitConfig = repl::ReplSetConfig::parse(splitConfigBson["replSetReconfig"].Obj());
ASSERT(splitConfig.isSplitConfig());
- auto result = decisionFuture.get();
- ASSERT(!result.abortReason);
- ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted);
-
serviceInstance->tryForget();
+ auto completionFuture = serviceInstance->completionFuture();
+ completionFuture.wait();
+
ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) {
FailPointEnableBlock fp("pauseShardSplitAfterBlocking");
- FailPointEnableBlock fp2("skipShardSplitWaitForSplitAcceptance");
auto opCtx = makeOperationContext();
auto serviceContext = getServiceContext();
@@ -352,7 +337,7 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) {
std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance;
{
- FailPointEnableBlock fp("pauseShardSplitBeforeBlocking");
+ FailPointEnableBlock fp("pauseShardSplitAfterBlocking");
auto initialTimesEntered = fp.initialTimesEntered();
serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
@@ -386,7 +371,7 @@ TEST_F(ShardSplitDonorServiceTest, StepDownTest) {
std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance;
{
- FailPointEnableBlock fp("pauseShardSplitBeforeBlocking");
+ FailPointEnableBlock fp("pauseShardSplitAfterBlocking");
auto initialTimesEntered = fp.initialTimesEntered();
serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
@@ -415,7 +400,14 @@ TEST_F(ShardSplitDonorServiceTest, DeleteStateDocMarkedGarbageCollectable) {
getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
auto stateDocument = defaultStateDocument();
- stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized);
+ stateDocument.setState(ShardSplitDonorStateEnum::kAborted);
+ stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1));
+
+ Status status(ErrorCodes::CallbackCanceled, "Split has been aborted");
+ BSONObjBuilder bob;
+ status.serializeErrorToBSON(&bob);
+ stateDocument.setAbortReason(bob.obj());
+
boost::optional<mongo::Date_t> expireAt = getServiceContext()->getFastClockSource()->now() +
Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()};
stateDocument.setExpireAt(expireAt);
@@ -567,6 +559,55 @@ protected:
std::unique_ptr<FailPointEnableBlock> _pauseBeforeRecipientCleanupFp;
FailPoint::EntryCountT _initialTimesEntered;
};
+
+TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) {
+ auto opCtx = makeOperationContext();
+ test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
+
+ auto initialFuture = [&]() {
+ FailPointEnableBlock fp("pauseShardSplitAfterBlocking");
+ auto initialTimesEntered = fp.initialTimesEntered();
+
+ std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance =
+ ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, defaultStateDocument().toBSON());
+ ASSERT(serviceInstance.get());
+
+ fp->waitForTimesEntered(initialTimesEntered + 1);
+ stepDown();
+
+ return serviceInstance->decisionFuture();
+ }();
+
+ auto result = initialFuture.getNoThrow();
+ ASSERT_FALSE(result.isOK());
+ ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.getStatus().code());
+
+ ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus());
+
+ auto fp = std::make_unique<FailPointEnableBlock>("pauseShardSplitAfterBlocking");
+ auto initialTimesEntered = fp->initialTimesEntered();
+
+ stepUp(opCtx.get());
+
+ fp->failPoint()->waitForTimesEntered(initialTimesEntered + 1);
+
+ std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance =
+ ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, defaultStateDocument().toBSON());
+ ASSERT(serviceInstance.get());
+
+ ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus());
+
+ fp.reset();
+
+ ASSERT_OK(serviceInstance->decisionFuture().getNoThrow().getStatus());
+
+ serviceInstance->tryForget();
+}
+
class ShardSplitRecipientCleanupTest : public ShardSplitPersistenceTest {
public:
repl::ReplSetConfig initialDonorConfig() override {