diff options
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service_test.cpp | 148 |
2 files changed, 91 insertions, 60 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 1af4c492723..db394bda552 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -284,6 +284,8 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( _markKilledExecutor->startup(); _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor); + pauseShardSplitBeforeRecipientCleanup.pauseWhileSet(); + const bool shouldRemoveStateDocumentOnRecipient = [&]() { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); stdx::lock_guard<Latch> lg(_mutex); @@ -294,7 +296,6 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( LOGV2(6309000, "Cancelling and cleaning up shard split operation on recipient in blocking state.", "id"_attr = _migrationId); - pauseShardSplitBeforeRecipientCleanup.pauseWhileSet(); _decisionPromise.setWith([&] { return ExecutorFuture(**executor) .then([this, executor, primaryToken, anchor = shared_from_this()] { diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 069c4926fb3..ddcfdba81aa 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -407,49 +407,6 @@ TEST_F(ShardSplitDonorServiceTest, StepDownTest) { ASSERT_FALSE(serviceInstance->isGarbageCollectable()); } -TEST_F(ShardSplitDonorServiceTest, StepUpWithkCommitted) { - auto opCtx = makeOperationContext(); - - test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); - test::shard_split::reconfigToAddRecipientNodes( - getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); - - auto stateDocument = defaultStateDocument(); - stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized); - - // insert the document for the first time. - ASSERT_OK(serverless::insertStateDoc(opCtx.get(), stateDocument)); - - { - // update to kCommitted - stateDocument.setState(ShardSplitDonorStateEnum::kCommitted); - boost::optional<mongo::Date_t> expireAt = getServiceContext()->getFastClockSource()->now() + - Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}; - stateDocument.setExpireAt(expireAt); - stateDocument.setBlockTimestamp(Timestamp(1, 1)); - stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1)); - - ASSERT_OK(serverless::updateStateDoc(opCtx.get(), stateDocument)); - - auto foundStateDoc = uassertStatusOK(serverless::getStateDocument(opCtx.get(), _uuid)); - invariant(foundStateDoc.getExpireAt()); - ASSERT_EQ(*foundStateDoc.getExpireAt(), *expireAt); - } - - auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( - opCtx.get(), _service, stateDocument.toBSON()); - ASSERT(serviceInstance.get()); - - auto result = serviceInstance->decisionFuture().get(); - ASSERT(!result.abortReason); - ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted); - - // we don't need to call tryForget since expireAt is already set the completionPromise will - // complete. - ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); - ASSERT_TRUE(serviceInstance->isGarbageCollectable()); -} - TEST_F(ShardSplitDonorServiceTest, DeleteStateDocMarkedGarbageCollectable) { auto opCtx = makeOperationContext(); @@ -581,7 +538,7 @@ TEST_F(SplitReplicaSetObserverTest, FutureNotReadyWrongSet) { ASSERT_FALSE(listener.getFuture().isReady()); } -class ShardSplitRecipientCleanupTest : public ShardSplitDonorServiceTest { +class ShardSplitPersistenceTest : public ShardSplitDonorServiceTest { public: void setUpPersistence(OperationContext* opCtx) override { @@ -590,22 +547,10 @@ public: repl::ReplicationCoordinator::get(opCtx->getServiceContext())); replCoord->alwaysAllowWrites(true); - BSONArrayBuilder members; - members.append(BSON("_id" << 1 << "host" - << "node1" - << "tags" << BSON("recipientTagName" << UUID::gen().toString()))); - - auto newConfig = repl::ReplSetConfig::parse(BSON("_id" << _recipientSetName << "version" - << 1 << "protocolVersion" << 1 - << "members" << members.arr())); - replCoord->setGetConfigReturnValue(newConfig); - - auto stateDocument = defaultStateDocument(); - stateDocument.setBlockTimestamp(Timestamp(1, 1)); - stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); + replCoord->setGetConfigReturnValue(initialDonorConfig()); - _recStateDoc = stateDocument; - uassertStatusOK(serverless::insertStateDoc(opCtx, stateDocument)); + _recStateDoc = initialStateDocument(); + uassertStatusOK(serverless::insertStateDoc(opCtx, _recStateDoc)); _pauseBeforeRecipientCleanupFp = std::make_unique<FailPointEnableBlock>("pauseShardSplitBeforeRecipientCleanup"); @@ -613,11 +558,37 @@ public: _initialTimesEntered = _pauseBeforeRecipientCleanupFp->initialTimesEntered(); } + virtual repl::ReplSetConfig initialDonorConfig() = 0; + + virtual ShardSplitDonorDocument initialStateDocument() = 0; + protected: ShardSplitDonorDocument _recStateDoc; std::unique_ptr<FailPointEnableBlock> _pauseBeforeRecipientCleanupFp; FailPoint::EntryCountT _initialTimesEntered; }; +class ShardSplitRecipientCleanupTest : public ShardSplitPersistenceTest { +public: + repl::ReplSetConfig initialDonorConfig() override { + BSONArrayBuilder members; + members.append(BSON("_id" << 1 << "host" + << "node1" + << "tags" << BSON("recipientTagName" << UUID::gen().toString()))); + + return repl::ReplSetConfig::parse(BSON("_id" << _recipientSetName << "version" << 1 + << "protocolVersion" << 1 << "members" + << members.arr())); + } + + ShardSplitDonorDocument initialStateDocument() override { + + auto stateDocument = defaultStateDocument(); + stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); + + return stateDocument; + } +}; TEST_F(ShardSplitRecipientCleanupTest, ShardSplitRecipientCleanup) { auto opCtx = makeOperationContext(); @@ -654,4 +625,63 @@ TEST_F(ShardSplitRecipientCleanupTest, ShardSplitRecipientCleanup) { ErrorCodes::NoMatchingDocument); } +class ShardSplitStepUpWithCommitted : public ShardSplitPersistenceTest { + repl::ReplSetConfig initialDonorConfig() override { + return _replSet.getReplConfig(); + } + + ShardSplitDonorDocument initialStateDocument() override { + + auto stateDocument = defaultStateDocument(); + + stateDocument.setState(ShardSplitDonorStateEnum::kCommitted); + _expireAt = getServiceContext()->getFastClockSource()->now() + + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}; + stateDocument.setExpireAt(_expireAt); + stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1)); + + return stateDocument; + } + +protected: + boost::optional<mongo::Date_t> _expireAt; +}; + +TEST_F(ShardSplitStepUpWithCommitted, StepUpWithkCommitted) { + auto opCtx = makeOperationContext(); + + test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); + + auto foundStateDoc = uassertStatusOK(serverless::getStateDocument(opCtx.get(), _uuid)); + invariant(foundStateDoc.getExpireAt()); + ASSERT_EQ(*foundStateDoc.getExpireAt(), *_expireAt); + + ASSERT(_pauseBeforeRecipientCleanupFp); + _pauseBeforeRecipientCleanupFp.get()->failPoint()->waitForTimesEntered(_initialTimesEntered + + 1); + + auto splitService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(ShardSplitDonorService::kServiceName); + auto optionalInstance = ShardSplitDonorService::DonorStateMachine::lookup( + opCtx.get(), splitService, BSON("_id" << _uuid)); + + ASSERT(optionalInstance); + _pauseBeforeRecipientCleanupFp.reset(); + + auto serviceInstance = optionalInstance->get(); + + + auto result = serviceInstance->decisionFuture().get(); + ASSERT(!result.abortReason); + ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted); + + // we don't need to call tryForget since expireAt is already set the completionPromise will + // complete. + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(serviceInstance->isGarbageCollectable()); +} + } // namespace mongo |