diff options
author | mathisbessamdb <mathis.bessa@mongodb.com> | 2022-03-09 19:13:08 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-09 20:59:03 +0000 |
commit | b9c33f3ef53a686308b363485c6a90a2c0c614b8 (patch) | |
tree | 2212371e7aa757bc27c71e72acb2c93336cf8ce9 /src/mongo | |
parent | f0b8cf638b620660d1efff0207adff32b86dc72f (diff) | |
download | mongo-b9c33f3ef53a686308b363485c6a90a2c0c614b8.tar.gz |
SERVER-63090 Recipient garbarge collects itself on primary step up
Diffstat (limited to 'src/mongo')
8 files changed, 232 insertions, 7 deletions
diff --git a/src/mongo/db/repl/primary_only_service_test_fixture.cpp b/src/mongo/db/repl/primary_only_service_test_fixture.cpp index 9a770b487c7..70f7453cf9e 100644 --- a/src/mongo/db/repl/primary_only_service_test_fixture.cpp +++ b/src/mongo/db/repl/primary_only_service_test_fixture.cpp @@ -57,6 +57,10 @@ void PrimaryOnlyServiceMongoDTest::setUp() { repl::createOplog(opCtx.get()); + // This method was added in order to write data on disk during setUp which is called + // during a test case construction. + setUpPersistence(opCtx.get()); + // Set up OpObserverImpl so that repl::logOp() will store the oplog entry's optime in // ReplClientInfo. _opObserverRegistry = dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver()); diff --git a/src/mongo/db/repl/primary_only_service_test_fixture.h b/src/mongo/db/repl/primary_only_service_test_fixture.h index e7bdbafbe1c..8ca3dae3bb2 100644 --- a/src/mongo/db/repl/primary_only_service_test_fixture.h +++ b/src/mongo/db/repl/primary_only_service_test_fixture.h @@ -62,8 +62,19 @@ protected: virtual std::unique_ptr<repl::PrimaryOnlyService> makeService( ServiceContext* serviceContext) = 0; + + /** + * Used to add your own op observer to the op observer registry during setUp prior to running + * your tests. + */ virtual void setUpOpObserverRegistry(OpObserverRegistry* opObserverRegistry){}; + /** + * Used in order to set persistent data (such as state doc on disk) during setUp prior to + * running your tests. + */ + virtual void setUpPersistence(OperationContext* opCtx){}; + OpObserverRegistry* _opObserverRegistry = nullptr; repl::PrimaryOnlyServiceRegistry* _registry = nullptr; repl::PrimaryOnlyService* _service = nullptr; diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index 0f40fd7a434..e6c635451e5 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -374,10 +374,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, } auto donorStateDoc = parseAndValidateDonorDocument(doc); + uassert(ErrorCodes::IllegalOperation, str::stream() << "cannot delete a donor's state document " << doc - << " since it has not been marked as garbage collectable.", - donorStateDoc.getExpireAt()); + << " since it has not been marked as garbage collectable and is not a" + << " recipient garbage collectable.", + donorStateDoc.getExpireAt() || + serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc)); if (donorStateDoc.getTenantIds()) { auto tenantIds = *donorStateDoc.getTenantIds(); diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index ce25255e167..8bef0396098 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -107,6 +107,7 @@ void checkForTokenInterrupt(const CancellationToken& token) { MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlocking); MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking); MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance); +MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeRecipientCleanup); const std::string kTTLIndexName = "ShardSplitDonorTTLIndex"; @@ -293,7 +294,6 @@ Status ShardSplitDonorService::DonorStateMachine::checkIfOptionsConflict( SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( ScopedTaskExecutorPtr executor, const CancellationToken& primaryToken) noexcept { - auto abortToken = [&]() { stdx::lock_guard<Latch> lg(_mutex); _abortSource = CancellationSource(primaryToken); @@ -306,6 +306,32 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( _markKilledExecutor->startup(); _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor); + + const bool shouldRemoveStateDocumentOnRecipient = [&]() { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + stdx::lock_guard<Latch> lg(_mutex); + return serverless::shouldRemoveStateDocumentOnRecipient(opCtx.get(), _stateDoc); + }(); + + if (shouldRemoveStateDocumentOnRecipient) { + LOGV2(6309000, + "Cancelling and cleaning up shard split operation on recipient in blocking state.", + "id"_attr = _migrationId); + pauseShardSplitBeforeRecipientCleanup.pauseWhileSet(); + _decisionPromise.setWith([&] { + return ExecutorFuture(**executor) + .then([this, executor, primaryToken] { + return _cleanRecipientStateDoc(executor, primaryToken); + }) + .unsafeToInlineFuture(); + }); + + _completionPromise.setFrom( + _decisionPromise.getFuture().semi().ignoreValue().unsafeToInlineFuture()); + + return _completionPromise.getFuture().semi(); + } + _initiateTimeout(executor, abortToken); LOGV2(6086506, @@ -834,4 +860,31 @@ ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageColle }); } +ExecutorFuture<ShardSplitDonorService::DonorStateMachine::DurableState> +ShardSplitDonorService::DonorStateMachine::_cleanRecipientStateDoc( + const ScopedTaskExecutorPtr& executor, const CancellationToken& token) { + + return AsyncTry([this, self = shared_from_this()] { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto deleted = + uassertStatusOK(serverless::deleteStateDoc(opCtx.get(), _migrationId)); + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() + << "Did not find active shard split with migration id " << _migrationId, + deleted); + return repl::ReplClientInfo::forClient(opCtx.get()->getClient()).getLastOp(); + }) + .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, token) + .ignoreValue() + .then([this, executor]() { + LOGV2(6236607, + "Cleanup stale shard split operation on recipient.", + "migrationId"_attr = _migrationId); + stdx::lock_guard<Latch> lg(_mutex); + return DurableState{_stateDoc.getState()}; + }); +} + } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index 96aa2ca4de9..313b49a6d5c 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -197,6 +197,13 @@ private: ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectible( const ScopedTaskExecutorPtr& executor, const CancellationToken& token); + /* + * We need to call this method when we find out the replica set name is the same as the state + * doc recipient set name and the current state doc state is blocking. + */ + ExecutorFuture<DurableState> _cleanRecipientStateDoc(const ScopedTaskExecutorPtr& executor, + const CancellationToken& token); + private: const NamespaceString _stateDocumentsNS = NamespaceString::kTenantSplitDonorsNamespace; mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardSplitDonorService::_mutex"); diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index d2cfb46d80c..b11541f58c3 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -316,7 +316,8 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) { test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); - auto stateDocument = defaultStateDocument(); + auto stateDocument = ShardSplitDonorDocument::parse( + {"donor.document"}, BSON("_id" << _uuid << "tenantIds" << _tenantIds)); stateDocument.setState(ShardSplitDonorStateEnum::kAborted); auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( @@ -408,7 +409,6 @@ TEST_F(ShardSplitDonorServiceTest, StepUpWithkCommitted) { test::shard_split::reconfigToAddRecipientNodes( getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); - auto nss = NamespaceString::kTenantSplitDonorsNamespace; auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized); @@ -445,6 +445,32 @@ TEST_F(ShardSplitDonorServiceTest, StepUpWithkCommitted) { ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } +TEST_F(ShardSplitDonorServiceTest, DeleteStateDocMarkedGarbageCollectable) { + auto opCtx = makeOperationContext(); + + test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts()); + + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized); + boost::optional<mongo::Date_t> expireAt = getServiceContext()->getFastClockSource()->now() + + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}; + stateDocument.setExpireAt(expireAt); + + // insert the document for the first time. + ASSERT_OK(serverless::insertStateDoc(opCtx.get(), stateDocument)); + + // deletes a document that was marked as garbage collectable and succeeds. + StatusWith<bool> deleted = serverless::deleteStateDoc(opCtx.get(), stateDocument.getId()); + + ASSERT_OK(deleted.getStatus()); + ASSERT_TRUE(deleted.getValue()); + + ASSERT_EQ(serverless::getStateDocument(opCtx.get(), _uuid).getStatus().code(), + ErrorCodes::NoMatchingDocument); +} + class SplitReplicaSetObserverTest : public ServiceContextTest { public: void setUp() override { @@ -554,4 +580,77 @@ TEST_F(SplitReplicaSetObserverTest, ExecutorCanceled) { ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::ShutdownInProgress); } +class ShardSplitRecipientCleanupTest : public ShardSplitDonorServiceTest { +public: + void setUpPersistence(OperationContext* opCtx) override { + + // We need to allow writes during the test's setup. + auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>( + repl::ReplicationCoordinator::get(opCtx->getServiceContext())); + replCoord->alwaysAllowWrites(true); + + BSONArrayBuilder members; + members.append(BSON("_id" << 1 << "host" + << "node1" + << "tags" << BSON("recipientTagName" << UUID::gen().toString()))); + + auto newConfig = repl::ReplSetConfig::parse(BSON("_id" << _recipientSetName << "version" + << 1 << "protocolVersion" << 1 + << "members" << members.arr())); + replCoord->setGetConfigReturnValue(newConfig); + + auto stateDocument = defaultStateDocument(); + stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); + + _recStateDoc = stateDocument; + uassertStatusOK(serverless::insertStateDoc(opCtx, stateDocument)); + + _pauseBeforeRecipientCleanupFp = + std::make_unique<FailPointEnableBlock>("pauseShardSplitBeforeRecipientCleanup"); + + _initialTimesEntered = _pauseBeforeRecipientCleanupFp->initialTimesEntered(); + } + +protected: + ShardSplitDonorDocument _recStateDoc; + std::unique_ptr<FailPointEnableBlock> _pauseBeforeRecipientCleanupFp; + FailPoint::EntryCountT _initialTimesEntered; +}; + +TEST_F(ShardSplitRecipientCleanupTest, ShardSplitRecipientCleanup) { + auto opCtx = makeOperationContext(); + test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); + + ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus()); + + auto decisionFuture = [&]() { + ASSERT(_pauseBeforeRecipientCleanupFp); + (*(_pauseBeforeRecipientCleanupFp.get()))->waitForTimesEntered(_initialTimesEntered + 1); + + auto splitService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(ShardSplitDonorService::kServiceName); + auto optionalDonor = ShardSplitDonorService::DonorStateMachine::lookup( + opCtx.get(), splitService, BSON("_id" << _uuid)); + + ASSERT_TRUE(optionalDonor); + auto serviceInstance = optionalDonor.get(); + ASSERT(serviceInstance.get()); + + _pauseBeforeRecipientCleanupFp.reset(); + + return serviceInstance->decisionFuture(); + }(); + + auto result = decisionFuture.get(); + + // We set the promise before the future chain. State will stay kBlocking with no abort. + ASSERT(!result.abortReason); + ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kBlocking); + + // deleted the local state doc so this should return NoMatchingDocument + ASSERT_EQ(serverless::getStateDocument(opCtx.get(), _uuid).getStatus().code(), + ErrorCodes::NoMatchingDocument); +} + } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp index c67aa3b4aad..f2db7aaf4d1 100644 --- a/src/mongo/db/serverless/shard_split_utils.cpp +++ b/src/mongo/db/serverless/shard_split_utils.cpp @@ -33,6 +33,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/ops/delete.h" #include "mongo/db/repl/repl_set_config.h" namespace mongo { @@ -188,8 +189,11 @@ StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx, } BSONObj result; - auto foundDoc = Helpers::findOne( - opCtx, collection.getCollection(), BSON("_id" << shardSplitId), result, true); + auto foundDoc = Helpers::findOne(opCtx, + collection.getCollection(), + BSON(ShardSplitDonorDocument::kIdFieldName << shardSplitId), + result, + true); if (!foundDoc) { return Status(ErrorCodes::NoMatchingDocument, @@ -207,6 +211,32 @@ StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx, } } +StatusWith<bool> deleteStateDoc(OperationContext* opCtx, const UUID& shardSplitId) { + const auto nss = NamespaceString::kTenantSplitDonorsNamespace; + AutoGetCollection collection(opCtx, nss, MODE_IX); + + if (!collection) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << nss.ns() << " does not exist"); + } + auto query = BSON(ShardSplitDonorDocument::kIdFieldName << shardSplitId); + return writeConflictRetry(opCtx, "ShardSplitDonorDeleteStateDoc", nss.ns(), [&]() -> bool { + auto nDeleted = + deleteObjects(opCtx, collection.getCollection(), nss, query, true /* justOne */); + return nDeleted > 0; + }); +} + +bool shouldRemoveStateDocumentOnRecipient(OperationContext* opCtx, + const ShardSplitDonorDocument& stateDocument) { + if (!stateDocument.getRecipientSetName()) { + return false; + } + auto recipientSetName = *stateDocument.getRecipientSetName(); + auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); + return recipientSetName == config.getReplSetName() && + stateDocument.getState() == ShardSplitDonorStateEnum::kBlocking; +} } // namespace serverless } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h index 99060abfa5c..ec2f1becd58 100644 --- a/src/mongo/db/serverless/shard_split_utils.h +++ b/src/mongo/db/serverless/shard_split_utils.h @@ -86,6 +86,15 @@ Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st */ Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc); + +/** + * Deletes a state documents in the database for a recipient if the state is blocking at startup. + * + * Returns 'NamespaceNotFound' if no matching namespace is found. Returns true if the doc was + * removed. + */ +StatusWith<bool> deleteStateDoc(OperationContext* opCtx, const UUID& shardSplitId); + /** * Returns the state doc matching the document with shardSplitId from the disk if it * exists. Reads at "no" timestamp i.e, reading with the "latest" snapshot reflecting up to date @@ -99,5 +108,14 @@ Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx, const UUID& shardSplitId); +/** + * Returns true if the state document should be removed for a shard split recipient which is based + * on having a local state doc in kBlocking state and having matching recipientSetName matching the + * config.replSetName. + */ +bool shouldRemoveStateDocumentOnRecipient(OperationContext* opCtx, + const ShardSplitDonorDocument& stateDocument); + + } // namespace serverless } // namespace mongo |