diff options
author | Didier Nadeau <didier.nadeau@mongodb.com> | 2022-03-18 15:19:05 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-18 15:53:33 +0000 |
commit | 9c83f3aabd24f31a9a6bf42b979483a7b18ffddc (patch) | |
tree | ca85046c364313b32bd46f4e4a249bc4cfcb1d9e /src/mongo | |
parent | 699fe3691b7dea6f140da76c0da443f6e32f72e6 (diff) | |
download | mongo-9c83f3aabd24f31a9a6bf42b979483a7b18ffddc.tar.gz |
SERVER-63652 Shard split insert document in blocking state
Diffstat (limited to 'src/mongo')
5 files changed, 254 insertions, 228 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index a7e15963bf7..20c4dcfb7d8 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -38,6 +38,10 @@ namespace mongo { namespace { +bool isSecondary(const OperationContext* opCtx) { + return !opCtx->writesAreReplicated(); +} + const auto tenantIdsToDeleteDecoration = OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>(); @@ -123,70 +127,49 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { */ void onBlockerInitialization(OperationContext* opCtx, const ShardSplitDonorDocument& donorStateDoc) { - invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized); + invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking); + invariant(donorStateDoc.getBlockTimestamp()); auto optionalTenants = donorStateDoc.getTenantIds(); invariant(optionalTenants); - auto recipientTagName = donorStateDoc.getRecipientTagName(); - auto recipientSetName = donorStateDoc.getRecipientSetName(); - invariant(recipientTagName); - invariant(recipientSetName); - - auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); - auto recipientConnectionString = - serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName); - - for (const auto& tenantId : optionalTenants.get()) { - auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( - opCtx->getServiceContext(), - donorStateDoc.getId(), - tenantId.toString(), - MigrationProtocolEnum::kMultitenantMigrations, - recipientConnectionString.toString()); - - TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab); - - if (opCtx->writesAreReplicated()) { - // onRollback is not registered on secondaries since secondaries should not fail to - // apply the write. - opCtx->recoveryUnit()->onRollback([opCtx, donorStateDoc, tenant = tenantId.toString()] { - TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .remove(tenant, TenantMigrationAccessBlocker::BlockerType::kDonor); - }); - } - } -} + const auto& tenantIds = optionalTenants.get(); -/** - * Transitions the TenantMigrationDonorAccessBlocker to the blocking state. - */ -void onTransitionToBlocking(OperationContext* opCtx, const ShardSplitDonorDocument& donorStateDoc) { - invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking); - invariant(donorStateDoc.getBlockTimestamp()); - invariant(donorStateDoc.getTenantIds()); + // The primary create and sets the tenant access blocker to blocking within the + // ShardSplitDonorService. + if (isSecondary(opCtx)) { + auto recipientTagName = donorStateDoc.getRecipientTagName(); + auto recipientSetName = donorStateDoc.getRecipientSetName(); + invariant(recipientTagName); + invariant(recipientSetName); - if (donorStateDoc.getTenantIds()) { - auto tenantIds = donorStateDoc.getTenantIds().get(); - for (auto tenantId : tenantIds) { - auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( - opCtx->getServiceContext(), tenantId); - invariant(mtab); - - if (!opCtx->writesAreReplicated()) { - // A primary calls startBlockingWrites on the TenantMigrationDonorAccessBlocker - // before reserving the OpTime for the "start blocking" write, so only secondaries - // call startBlockingWrites on the TenantMigrationDonorAccessBlocker in the op - // observer. - mtab->startBlockingWrites(); - } + auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); + auto recipientConnectionString = + serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName); + + for (const auto& tenantId : tenantIds) { + auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( + opCtx->getServiceContext(), + donorStateDoc.getId(), + tenantId.toString(), + MigrationProtocolEnum::kMultitenantMigrations, + recipientConnectionString.toString()); - // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since - // startBlockingReadsAfter just needs to be called before the "start blocking" write's - // oplog hole is filled. - mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get()); + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .add(tenantId, mtab); + + // No rollback handler is necessary as the write should not fail on secondaries. + mtab->startBlockingWrites(); } } + + for (const auto& tenantId : tenantIds) { + auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( + opCtx->getServiceContext(), tenantId); + invariant(mtab); + + mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get()); + } } /** @@ -325,17 +308,17 @@ void ShardSplitDonorOpObserver::onInserts(OperationContext* opCtx, for (auto it = first; it != last; it++) { auto donorStateDoc = parseAndValidateDonorDocument(it->doc); switch (donorStateDoc.getState()) { - case ShardSplitDonorStateEnum::kUninitialized: + case ShardSplitDonorStateEnum::kBlocking: onBlockerInitialization(opCtx, donorStateDoc); break; case ShardSplitDonorStateEnum::kAborted: // If the operation starts aborted, do not do anything. break; - case ShardSplitDonorStateEnum::kBlocking: + case ShardSplitDonorStateEnum::kUninitialized: case ShardSplitDonorStateEnum::kCommitted: uasserted(ErrorCodes::IllegalOperation, - "cannot insert a donor's state doc with 'state' other than 'aborting " - "index builds'"); + "cannot insert a donor's state doc with 'state' other than 'kAborted' or " + "'kBlocking'"); break; default: MONGO_UNREACHABLE; @@ -352,14 +335,16 @@ void ShardSplitDonorOpObserver::onUpdate(OperationContext* opCtx, auto donorStateDoc = parseAndValidateDonorDocument(args.updateArgs->updatedDoc); switch (donorStateDoc.getState()) { - case ShardSplitDonorStateEnum::kBlocking: - onTransitionToBlocking(opCtx, donorStateDoc); - break; case ShardSplitDonorStateEnum::kCommitted: case ShardSplitDonorStateEnum::kAborted: opCtx->recoveryUnit()->registerChange( std::make_unique<TenantMigrationDonorCommitOrAbortHandler>(opCtx, donorStateDoc)); break; + case ShardSplitDonorStateEnum::kBlocking: + uasserted(ErrorCodes::IllegalOperation, + "The state document should be inserted as blocking and never transition to " + "blocking"); + break; default: MONGO_UNREACHABLE; } diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp index d040690e0cf..c52868126e6 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp @@ -49,13 +49,6 @@ void startBlockingReadsAfter( } } -void startBlockingWrites( - std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>>& blockers) { - for (auto& blocker : blockers) { - blocker->startBlockingWrites(); - } -} - class ShardSplitDonorOpObserverTest : public ServiceContextMongoDTest { public: void setUp() override { @@ -110,14 +103,7 @@ protected: void runUpdateTestCase( ShardSplitDonorDocument stateDocument, const std::vector<std::string>& tenants, - std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> blockers, std::function<void(std::shared_ptr<TenantMigrationAccessBlocker>)> mtabVerifier) { - ASSERT_EQ(tenants.size(), blockers.size()); - - for (size_t i = 0; i < blockers.size(); ++i) { - TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext()) - .add(tenants[i], std::move(blockers[i])); - } // If there's an exception, aborting without removing the access blocker will trigger an // invariant. This creates a confusing error log in the test output. @@ -140,13 +126,13 @@ protected: scopedTenants.dismiss(); } - std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> createBlockers( - const std::vector<std::string>& tenants, - OperationContext* opCtx, - const std::string& connectionStr) { + std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> + createBlockersAndStartBlockingWrites(const std::vector<std::string>& tenants, + OperationContext* opCtx, + const std::string& connectionStr) { auto uuid = UUID::gen(); std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> blockers; - for (auto& tenant : tenants) { + for (const auto& tenant : tenants) { auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( _opCtx->getServiceContext(), uuid, @@ -154,7 +140,9 @@ protected: MigrationProtocolEnum::kMultitenantMigrations, _connectionStr); - blockers.push_back(std::move(mtab)); + blockers.push_back(mtab); + mtab->startBlockingWrites(); + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenant, mtab); } return blockers; @@ -227,6 +215,7 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertWrongType) { TEST_F(ShardSplitDonorOpObserverTest, InitialInsertInvalidState) { std::vector<ShardSplitDonorStateEnum> states = {ShardSplitDonorStateEnum::kAborted, ShardSplitDonorStateEnum::kBlocking, + ShardSplitDonorStateEnum::kUninitialized, ShardSplitDonorStateEnum::kCommitted}; for (auto state : states) { @@ -264,14 +253,23 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertValidAbortedDocument) { } } -TEST_F(ShardSplitDonorOpObserverTest, InsertDocument) { +TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentPrimary) { test::shard_split::reconfigToAddRecipientNodes( getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts()); + createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get(), _connectionStr); + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); + stateDocument.setBlockTimestamp(Timestamp(1, 1)); + auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { ASSERT_TRUE(mtab); - ASSERT_OK(mtab->checkIfCanWrite(Timestamp(1)).code()); + // The OpObserver does not set the mtab to blocking for primaries. + ASSERT_EQ(mtab->checkIfCanWrite(Timestamp(1, 1)).code(), + ErrorCodes::TenantMigrationConflict); + ASSERT_EQ(mtab->checkIfCanWrite(Timestamp(1, 3)).code(), + ErrorCodes::TenantMigrationConflict); ASSERT_OK(mtab->checkIfLinearizableReadWasAllowed(opCtx)); ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); }; @@ -279,16 +277,17 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertDocument) { runInsertTestCase(stateDocument, _tenantIds, mtabVerifier); } -TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingPrimary) { +TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentSecondary) { + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts()); + + // This indicates the instance is secondary for the OpObserver. + repl::UnreplicatedWritesBlock setSecondary(_opCtx.get()); + auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); stateDocument.setBlockTimestamp(Timestamp(1, 1)); - auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); - for (auto& blocker : blockers) { - blocker->startBlockingWrites(); - } - auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { ASSERT_TRUE(mtab); // The OpObserver does not set the mtab to blocking for primaries. @@ -300,10 +299,11 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingPrimary) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); }; - runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); + runInsertTestCase(stateDocument, _tenantIds, mtabVerifier); } -TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingSecondary) { + +TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingFail) { // This indicates the instance is secondary for the OpObserver. repl::UnreplicatedWritesBlock setSecondary(_opCtx.get()); @@ -311,19 +311,23 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingSecondary) { stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); stateDocument.setBlockTimestamp(Timestamp(1, 1)); - auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); - auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { - ASSERT_TRUE(mtab); - ASSERT_EQ(mtab->checkIfCanWrite(Timestamp(1, 1)).code(), - ErrorCodes::TenantMigrationConflict); - ASSERT_EQ(mtab->checkIfCanWrite(Timestamp(1, 3)).code(), - ErrorCodes::TenantMigrationConflict); - ASSERT_OK(mtab->checkIfLinearizableReadWasAllowed(opCtx)); - ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); + CollectionUpdateArgs updateArgs; + updateArgs.stmtIds = {}; + updateArgs.updatedDoc = stateDocument.toBSON(); + updateArgs.update = + BSON("$set" << BSON(ShardSplitDonorDocument::kStateFieldName + << ShardSplitDonorState_serializer(stateDocument.getState()))); + updateArgs.criteria = BSON("_id" << stateDocument.getId()); + OplogUpdateEntryArgs update(&updateArgs, _nss, stateDocument.getId()); + + auto update_lambda = [&]() { + WriteUnitOfWork wuow(_opCtx.get()); + _observer->onUpdate(_opCtx.get(), update); + wuow.commit(); }; - runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); + ASSERT_THROWS_CODE(update_lambda(), DBException, ErrorCodes::IllegalOperation); } TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) { @@ -336,8 +340,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) { stateDocument.setBlockTimestamp(Timestamp(1, 2)); stateDocument.setCommitOrAbortOpTime(commitOpTime); - auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); - startBlockingWrites(blockers); + auto blockers = createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get(), _connectionStr); startBlockingReadsAfter(blockers, Timestamp(1)); auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { @@ -351,7 +354,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationCommitted); }; - runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier); } TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) { @@ -369,8 +372,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) { stateDocument.setCommitOrAbortOpTime(commitOpTime); stateDocument.setAbortReason(bob.obj()); - auto blockers = createBlockers(_tenantIds, _opCtx.get(), _connectionStr); - startBlockingWrites(blockers); + auto blockers = createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get(), _connectionStr); startBlockingReadsAfter(blockers, Timestamp(1)); auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { @@ -383,7 +385,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) { ASSERT_OK(mtab->checkIfCanBuildIndex().code()); }; - runUpdateTestCase(stateDocument, _tenantIds, blockers, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier); } } // namespace diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index db394bda552..71cdd0879ed 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -55,6 +55,10 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking); +MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance); +MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeRecipientCleanup); + const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); bool shouldStopInsertingDonorStateDoc(Status status) { @@ -106,10 +110,39 @@ void checkForTokenInterrupt(const CancellationToken& token) { uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled()); } -MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlocking); -MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking); -MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance); -MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeRecipientCleanup); +void insertTenantAccessBlocker(WithLock lk, + OperationContext* opCtx, + ShardSplitDonorDocument donorStateDoc) { + auto optionalTenants = donorStateDoc.getTenantIds(); + invariant(optionalTenants); + + auto recipientTagName = donorStateDoc.getRecipientTagName(); + auto recipientSetName = donorStateDoc.getRecipientSetName(); + invariant(recipientTagName); + invariant(recipientSetName); + + auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); + auto recipientConnectionString = + serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName); + + for (const auto& tenantId : optionalTenants.get()) { + auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( + opCtx->getServiceContext(), + donorStateDoc.getId(), + tenantId.toString(), + MigrationProtocolEnum::kMultitenantMigrations, + recipientConnectionString.toString()); + + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab); + + // If the wuow fails, we need to remove the access blockers we just added. This ensures we + // leave things in a valid state and are able to retry the operation. + opCtx->recoveryUnit()->onRollback([opCtx, donorStateDoc, tenant = tenantId.toString()] { + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .remove(tenant, TenantMigrationAccessBlocker::BlockerType::kDonor); + }); + } +} const std::string kTTLIndexName = "ShardSplitDonorTTLIndex"; @@ -324,6 +357,8 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( "id"_attr = _migrationId, "timeout"_attr = repl::shardSplitTimeoutMS.load()); + _createReplicaSetMonitor(abortToken); + _decisionPromise.setWith([&] { return ExecutorFuture(**executor) .then([this, executor, primaryToken] { @@ -332,21 +367,15 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( // inserting the initial state document fails. return _writeInitialDocument(executor, primaryToken); }) - .then([this] { pauseShardSplitBeforeBlocking.pauseWhileSet(); }) .then([this, executor, abortToken] { checkForTokenInterrupt(abortToken); _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); - _createReplicaSetMonitor(abortToken); - return _enterBlockingState(executor, abortToken); - }) - .then([this, abortToken] { if (MONGO_unlikely(pauseShardSplitAfterBlocking.shouldFail())) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); pauseShardSplitAfterBlocking.pauseWhileSetAndNotCanceled(opCtx.get(), abortToken); } - }) - .then([this, executor, abortToken] { + return _waitForRecipientToReachBlockTimestamp(executor, abortToken); }) .then([this, executor, abortToken] { @@ -401,24 +430,6 @@ boost::optional<BSONObj> ShardSplitDonorService::DonorStateMachine::reportForCur return bob.obj(); } -ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingState( - const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { - checkForTokenInterrupt(abortToken); - - { - stdx::lock_guard<Latch> lg(_mutex); - if (_stateDoc.getState() >= ShardSplitDonorStateEnum::kBlocking) { - return ExecutorFuture(**executor); - } - } - - LOGV2(6177200, "Entering blocking state.", "id"_attr = _migrationId); - return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kBlocking) - .then([this, executor, abortToken](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken); - }); -} - ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestamp( const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { @@ -540,37 +551,40 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDocument( const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryServiceToken) { - const auto initialState = [&]() { + ShardSplitDonorStateEnum nextState; + { stdx::lock_guard<Latch> lg(_mutex); - return _stateDoc.getState(); - }(); - if (initialState == ShardSplitDonorStateEnum::kAborted) { - stdx::lock_guard<Latch> lg(_mutex); - if (isAbortedDocumentPersistent(lg, _stateDoc)) { - // Node has step up and created an instance using a document in abort state. No need - // to write the document as it already exists. + if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { + if (isAbortedDocumentPersistent(lg, _stateDoc)) { + // Node has step up and created an instance using a document in abort state. No need + // to write the document as it already exists. + return ExecutorFuture(**executor); + } + + _abortReason = + Status(ErrorCodes::TenantMigrationAborted, "Aborted due to abortShardSplit."); + BSONObjBuilder bob; + _abortReason->serializeErrorToBSON(&bob); + _stateDoc.setAbortReason(bob.obj()); + nextState = ShardSplitDonorStateEnum::kAborted; + + } else if (_stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized) { + _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking); + nextState = ShardSplitDonorStateEnum::kBlocking; + } else { + // Node has step up and resumed a shard split. No need to write the document as it + // already exists. return ExecutorFuture(**executor); } - - _abortReason = - Status(ErrorCodes::TenantMigrationAborted, "Aborted due to abortShardSplit."); - BSONObjBuilder bob; - _abortReason->serializeErrorToBSON(&bob); - _stateDoc.setAbortReason(bob.obj()); - - } else if (initialState != ShardSplitDonorStateEnum::kUninitialized) { - // Node has step up and resumed a shard split. No need to write the document as it - // already exists. - return ExecutorFuture(**executor); } LOGV2(6086504, "Inserting initial state document.", "id"_attr = _migrationId, - "state"_attr = initialState); + "state"_attr = nextState); - return AsyncTry([this, nextState = initialState, uuid = _migrationId]() { + return AsyncTry([this, nextState, uuid = _migrationId]() { auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto opCtx = opCtxHolder.get(); @@ -584,33 +598,32 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDoc return _stateDoc.toBSON(); }; - if (nextState == ShardSplitDonorStateEnum::kAborted) { - WriteUnitOfWork wuow(opCtx); - - // Reserve an opTime for the write. - auto oplogSlot = - LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - setStateDocTimestamps( - stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc); - auto updateResult = Helpers::upsert(opCtx, - _stateDocumentsNS.ns(), - filter, - getUpdatedStateDocBson(), - /*fromMigrate=*/false); - - // We only want to insert, not modify, document - invariant(updateResult.numDocsModified == 0); - wuow.commit(); - } else { - auto updateResult = Helpers::upsert(opCtx, - _stateDocumentsNS.ns(), - filter, - getUpdatedStateDocBson(), - /*fromMigrate=*/false); - - // We only want to insert, not modify, document - invariant(updateResult.numDocsModified == 0); + WriteUnitOfWork wuow(opCtx); + if (nextState == ShardSplitDonorStateEnum::kBlocking) { + stdx::lock_guard<Latch> lg(_mutex); + + insertTenantAccessBlocker(lg, opCtx, _stateDoc); + + auto tenantIds = _stateDoc.getTenantIds(); + invariant(tenantIds); + setMtabToBlockingForTenants(_serviceContext, opCtx, tenantIds.get()); } + + // Reserve an opTime for the write. + auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; + setStateDocTimestamps( + stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc); + + auto updateResult = Helpers::upsert(opCtx, + _stateDocumentsNS.ns(), + filter, + getUpdatedStateDocBson(), + /*fromMigrate=*/false); + + + // We only want to insert, not modify, document + invariant(updateResult.numMatched == 0); + wuow.commit(); }); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); @@ -623,7 +636,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDoc .then([this, executor, primaryServiceToken](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryServiceToken); }) - .then([this, executor, nextState = initialState]() { + .then([this, executor, nextState]() { uassert(ErrorCodes::TenantMigrationAborted, "Shard split operation aborted", nextState != ShardSplitDonorStateEnum::kAborted); @@ -654,10 +667,6 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] { WriteUnitOfWork wuow(opCtx); - if (nextState == ShardSplitDonorStateEnum::kBlocking) { - invariant(tenantIds); - setMtabToBlockingForTenants(_serviceContext, opCtx, tenantIds.get()); - } // Reserve an opTime for the write. auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; setStateDocTimestamps( @@ -749,7 +758,6 @@ void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor( }(); _recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture()); - _replicaSetMonitorCreatedPromise.emplaceValue(); } ExecutorFuture<ShardSplitDonorService::DonorStateMachine::DurableState> diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index d001bba0de1..6af6d79ad49 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -131,10 +131,6 @@ public: return _completionPromise.getFuture(); } - SharedSemiFuture<void> replicaSetMonitorCreatedFuture() const { - return _replicaSetMonitorCreatedPromise.getFuture(); - } - UUID getId() const { return _migrationId; } @@ -156,9 +152,6 @@ public: private: // Tasks - ExecutorFuture<void> _enterBlockingState(const ScopedTaskExecutorPtr& executor, - const CancellationToken& token); - ExecutorFuture<void> _waitForRecipientToReachBlockTimestamp( const ScopedTaskExecutorPtr& executor, const CancellationToken& token); @@ -223,9 +216,6 @@ private: boost::optional<CancellationSource> _abortSource; boost::optional<Status> _abortReason; - // A promise fulfilled when the replicaSetMonitor has been created; - SharedPromise<void> _replicaSetMonitorCreatedPromise; - // A promise fulfilled when the shard split has committed or aborted. SharedPromise<DurableState> _decisionPromise; diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index ddcfdba81aa..9f9acd832c9 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -45,6 +45,8 @@ #include "mongo/db/repl/primary_only_service_test_fixture.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/repl/tenant_migration_access_blocker_registry.h" +#include "mongo/db/repl/tenant_migration_donor_access_blocker.h" #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_donor_service.h" @@ -228,11 +230,10 @@ protected: StreamableReplicaSetMonitorForTesting _rsmMonitor; std::string _recipientTagName{"$recipientNode"}; std::string _recipientSetName{_replSet.getURI().getSetName()}; + FailPointEnableBlock _skipAcceptanceFP{"skipShardSplitWaitForSplitAcceptance"}; }; TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) { - FailPointEnableBlock fp("skipShardSplitWaitForSplitAcceptance"); - auto opCtx = makeOperationContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); test::shard_split::reconfigToAddRecipientNodes( @@ -245,44 +246,28 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) ASSERT_EQ(_uuid, serviceInstance->getId()); auto decisionFuture = serviceInstance->decisionFuture(); - - std::shared_ptr<TopologyDescription> topologyDescriptionOld = - std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration()); - std::shared_ptr<TopologyDescription> topologyDescriptionNew = - makeRecipientTopologyDescription(_replSet); - - // Wait until the RSM has been created by the instance. - auto replicaSetMonitorCreatedFuture = serviceInstance->replicaSetMonitorCreatedFuture(); - replicaSetMonitorCreatedFuture.wait(opCtx.get()); - - // Retrieve monitor installed by _rsmMonitor.setup(...) - auto monitor = std::dynamic_pointer_cast<StreamableReplicaSetMonitor>( - ReplicaSetMonitor::createIfNeeded(_replSet.getURI())); - invariant(monitor); - auto publisher = monitor->getEventsPublisher(); - - publisher->onTopologyDescriptionChangedEvent(topologyDescriptionOld, topologyDescriptionNew); - decisionFuture.wait(); + auto result = decisionFuture.get(); + ASSERT(!result.abortReason); + ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted); + BSONObj splitConfigBson = mockReplSetReconfigCmd.getLatestConfig(); ASSERT_TRUE(splitConfigBson.hasField("replSetReconfig")); auto splitConfig = repl::ReplSetConfig::parse(splitConfigBson["replSetReconfig"].Obj()); ASSERT(splitConfig.isSplitConfig()); - auto result = decisionFuture.get(); - ASSERT(!result.abortReason); - ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted); - serviceInstance->tryForget(); + auto completionFuture = serviceInstance->completionFuture(); + completionFuture.wait(); + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) { FailPointEnableBlock fp("pauseShardSplitAfterBlocking"); - FailPointEnableBlock fp2("skipShardSplitWaitForSplitAcceptance"); auto opCtx = makeOperationContext(); auto serviceContext = getServiceContext(); @@ -352,7 +337,7 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) { std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance; { - FailPointEnableBlock fp("pauseShardSplitBeforeBlocking"); + FailPointEnableBlock fp("pauseShardSplitAfterBlocking"); auto initialTimesEntered = fp.initialTimesEntered(); serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( @@ -386,7 +371,7 @@ TEST_F(ShardSplitDonorServiceTest, StepDownTest) { std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance; { - FailPointEnableBlock fp("pauseShardSplitBeforeBlocking"); + FailPointEnableBlock fp("pauseShardSplitAfterBlocking"); auto initialTimesEntered = fp.initialTimesEntered(); serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( @@ -415,7 +400,14 @@ TEST_F(ShardSplitDonorServiceTest, DeleteStateDocMarkedGarbageCollectable) { getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); auto stateDocument = defaultStateDocument(); - stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized); + stateDocument.setState(ShardSplitDonorStateEnum::kAborted); + stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1)); + + Status status(ErrorCodes::CallbackCanceled, "Split has been aborted"); + BSONObjBuilder bob; + status.serializeErrorToBSON(&bob); + stateDocument.setAbortReason(bob.obj()); + boost::optional<mongo::Date_t> expireAt = getServiceContext()->getFastClockSource()->now() + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}; stateDocument.setExpireAt(expireAt); @@ -567,6 +559,55 @@ protected: std::unique_ptr<FailPointEnableBlock> _pauseBeforeRecipientCleanupFp; FailPoint::EntryCountT _initialTimesEntered; }; + +TEST_F(ShardSplitDonorServiceTest, ResumeAfterStepdownTest) { + auto opCtx = makeOperationContext(); + test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); + + auto initialFuture = [&]() { + FailPointEnableBlock fp("pauseShardSplitAfterBlocking"); + auto initialTimesEntered = fp.initialTimesEntered(); + + std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance = + ShardSplitDonorService::DonorStateMachine::getOrCreate( + opCtx.get(), _service, defaultStateDocument().toBSON()); + ASSERT(serviceInstance.get()); + + fp->waitForTimesEntered(initialTimesEntered + 1); + stepDown(); + + return serviceInstance->decisionFuture(); + }(); + + auto result = initialFuture.getNoThrow(); + ASSERT_FALSE(result.isOK()); + ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.getStatus().code()); + + ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus()); + + auto fp = std::make_unique<FailPointEnableBlock>("pauseShardSplitAfterBlocking"); + auto initialTimesEntered = fp->initialTimesEntered(); + + stepUp(opCtx.get()); + + fp->failPoint()->waitForTimesEntered(initialTimesEntered + 1); + + std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance = + ShardSplitDonorService::DonorStateMachine::getOrCreate( + opCtx.get(), _service, defaultStateDocument().toBSON()); + ASSERT(serviceInstance.get()); + + ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus()); + + fp.reset(); + + ASSERT_OK(serviceInstance->decisionFuture().getNoThrow().getStatus()); + + serviceInstance->tryForget(); +} + class ShardSplitRecipientCleanupTest : public ShardSplitPersistenceTest { public: repl::ReplSetConfig initialDonorConfig() override { |