summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authormathisbessamdb <mathis.bessa@mongodb.com>2022-03-09 19:13:08 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-09 20:59:03 +0000
commitb9c33f3ef53a686308b363485c6a90a2c0c614b8 (patch)
tree2212371e7aa757bc27c71e72acb2c93336cf8ce9 /src/mongo
parentf0b8cf638b620660d1efff0207adff32b86dc72f (diff)
downloadmongo-b9c33f3ef53a686308b363485c6a90a2c0c614b8.tar.gz
SERVER-63090 Recipient garbarge collects itself on primary step up
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/primary_only_service_test_fixture.cpp4
-rw-r--r--src/mongo/db/repl/primary_only_service_test_fixture.h11
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp7
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp55
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.h7
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp103
-rw-r--r--src/mongo/db/serverless/shard_split_utils.cpp34
-rw-r--r--src/mongo/db/serverless/shard_split_utils.h18
8 files changed, 232 insertions, 7 deletions
diff --git a/src/mongo/db/repl/primary_only_service_test_fixture.cpp b/src/mongo/db/repl/primary_only_service_test_fixture.cpp
index 9a770b487c7..70f7453cf9e 100644
--- a/src/mongo/db/repl/primary_only_service_test_fixture.cpp
+++ b/src/mongo/db/repl/primary_only_service_test_fixture.cpp
@@ -57,6 +57,10 @@ void PrimaryOnlyServiceMongoDTest::setUp() {
repl::createOplog(opCtx.get());
+ // This method was added in order to write data on disk during setUp which is called
+ // during a test case construction.
+ setUpPersistence(opCtx.get());
+
// Set up OpObserverImpl so that repl::logOp() will store the oplog entry's optime in
// ReplClientInfo.
_opObserverRegistry = dynamic_cast<OpObserverRegistry*>(serviceContext->getOpObserver());
diff --git a/src/mongo/db/repl/primary_only_service_test_fixture.h b/src/mongo/db/repl/primary_only_service_test_fixture.h
index e7bdbafbe1c..8ca3dae3bb2 100644
--- a/src/mongo/db/repl/primary_only_service_test_fixture.h
+++ b/src/mongo/db/repl/primary_only_service_test_fixture.h
@@ -62,8 +62,19 @@ protected:
virtual std::unique_ptr<repl::PrimaryOnlyService> makeService(
ServiceContext* serviceContext) = 0;
+
+ /**
+ * Used to add your own op observer to the op observer registry during setUp prior to running
+ * your tests.
+ */
virtual void setUpOpObserverRegistry(OpObserverRegistry* opObserverRegistry){};
+ /**
+ * Used in order to set persistent data (such as state doc on disk) during setUp prior to
+ * running your tests.
+ */
+ virtual void setUpPersistence(OperationContext* opCtx){};
+
OpObserverRegistry* _opObserverRegistry = nullptr;
repl::PrimaryOnlyServiceRegistry* _registry = nullptr;
repl::PrimaryOnlyService* _service = nullptr;
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index 0f40fd7a434..e6c635451e5 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -374,10 +374,13 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
}
auto donorStateDoc = parseAndValidateDonorDocument(doc);
+
uassert(ErrorCodes::IllegalOperation,
str::stream() << "cannot delete a donor's state document " << doc
- << " since it has not been marked as garbage collectable.",
- donorStateDoc.getExpireAt());
+ << " since it has not been marked as garbage collectable and is not a"
+ << " recipient garbage collectable.",
+ donorStateDoc.getExpireAt() ||
+ serverless::shouldRemoveStateDocumentOnRecipient(opCtx, donorStateDoc));
if (donorStateDoc.getTenantIds()) {
auto tenantIds = *donorStateDoc.getTenantIds();
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index ce25255e167..8bef0396098 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -107,6 +107,7 @@ void checkForTokenInterrupt(const CancellationToken& token) {
MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlocking);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking);
MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance);
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeRecipientCleanup);
const std::string kTTLIndexName = "ShardSplitDonorTTLIndex";
@@ -293,7 +294,6 @@ Status ShardSplitDonorService::DonorStateMachine::checkIfOptionsConflict(
SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
ScopedTaskExecutorPtr executor, const CancellationToken& primaryToken) noexcept {
-
auto abortToken = [&]() {
stdx::lock_guard<Latch> lg(_mutex);
_abortSource = CancellationSource(primaryToken);
@@ -306,6 +306,32 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
_markKilledExecutor->startup();
_cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor);
+
+ const bool shouldRemoveStateDocumentOnRecipient = [&]() {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ stdx::lock_guard<Latch> lg(_mutex);
+ return serverless::shouldRemoveStateDocumentOnRecipient(opCtx.get(), _stateDoc);
+ }();
+
+ if (shouldRemoveStateDocumentOnRecipient) {
+ LOGV2(6309000,
+ "Cancelling and cleaning up shard split operation on recipient in blocking state.",
+ "id"_attr = _migrationId);
+ pauseShardSplitBeforeRecipientCleanup.pauseWhileSet();
+ _decisionPromise.setWith([&] {
+ return ExecutorFuture(**executor)
+ .then([this, executor, primaryToken] {
+ return _cleanRecipientStateDoc(executor, primaryToken);
+ })
+ .unsafeToInlineFuture();
+ });
+
+ _completionPromise.setFrom(
+ _decisionPromise.getFuture().semi().ignoreValue().unsafeToInlineFuture());
+
+ return _completionPromise.getFuture().semi();
+ }
+
_initiateTimeout(executor, abortToken);
LOGV2(6086506,
@@ -834,4 +860,31 @@ ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageColle
});
}
+ExecutorFuture<ShardSplitDonorService::DonorStateMachine::DurableState>
+ShardSplitDonorService::DonorStateMachine::_cleanRecipientStateDoc(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& token) {
+
+ return AsyncTry([this, self = shared_from_this()] {
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto deleted =
+ uassertStatusOK(serverless::deleteStateDoc(opCtx.get(), _migrationId));
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream()
+ << "Did not find active shard split with migration id " << _migrationId,
+ deleted);
+ return repl::ReplClientInfo::forClient(opCtx.get()->getClient()).getLastOp();
+ })
+ .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, token)
+ .ignoreValue()
+ .then([this, executor]() {
+ LOGV2(6236607,
+ "Cleanup stale shard split operation on recipient.",
+ "migrationId"_attr = _migrationId);
+ stdx::lock_guard<Latch> lg(_mutex);
+ return DurableState{_stateDoc.getState()};
+ });
+}
+
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h
index 96aa2ca4de9..313b49a6d5c 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.h
+++ b/src/mongo/db/serverless/shard_split_donor_service.h
@@ -197,6 +197,13 @@ private:
ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectible(
const ScopedTaskExecutorPtr& executor, const CancellationToken& token);
+ /*
+ * We need to call this method when we find out the replica set name is the same as the state
+ * doc recipient set name and the current state doc state is blocking.
+ */
+ ExecutorFuture<DurableState> _cleanRecipientStateDoc(const ScopedTaskExecutorPtr& executor,
+ const CancellationToken& token);
+
private:
const NamespaceString _stateDocumentsNS = NamespaceString::kTenantSplitDonorsNamespace;
mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardSplitDonorService::_mutex");
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index d2cfb46d80c..b11541f58c3 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -316,7 +316,8 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) {
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
- auto stateDocument = defaultStateDocument();
+ auto stateDocument = ShardSplitDonorDocument::parse(
+ {"donor.document"}, BSON("_id" << _uuid << "tenantIds" << _tenantIds));
stateDocument.setState(ShardSplitDonorStateEnum::kAborted);
auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
@@ -408,7 +409,6 @@ TEST_F(ShardSplitDonorServiceTest, StepUpWithkCommitted) {
test::shard_split::reconfigToAddRecipientNodes(
getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
- auto nss = NamespaceString::kTenantSplitDonorsNamespace;
auto stateDocument = defaultStateDocument();
stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized);
@@ -445,6 +445,32 @@ TEST_F(ShardSplitDonorServiceTest, StepUpWithkCommitted) {
ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
+TEST_F(ShardSplitDonorServiceTest, DeleteStateDocMarkedGarbageCollectable) {
+ auto opCtx = makeOperationContext();
+
+ test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientSet.getHosts());
+
+ auto stateDocument = defaultStateDocument();
+ stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized);
+ boost::optional<mongo::Date_t> expireAt = getServiceContext()->getFastClockSource()->now() +
+ Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()};
+ stateDocument.setExpireAt(expireAt);
+
+ // insert the document for the first time.
+ ASSERT_OK(serverless::insertStateDoc(opCtx.get(), stateDocument));
+
+ // deletes a document that was marked as garbage collectable and succeeds.
+ StatusWith<bool> deleted = serverless::deleteStateDoc(opCtx.get(), stateDocument.getId());
+
+ ASSERT_OK(deleted.getStatus());
+ ASSERT_TRUE(deleted.getValue());
+
+ ASSERT_EQ(serverless::getStateDocument(opCtx.get(), _uuid).getStatus().code(),
+ ErrorCodes::NoMatchingDocument);
+}
+
class SplitReplicaSetObserverTest : public ServiceContextTest {
public:
void setUp() override {
@@ -554,4 +580,77 @@ TEST_F(SplitReplicaSetObserverTest, ExecutorCanceled) {
ASSERT_EQ(future.getNoThrow().code(), ErrorCodes::ShutdownInProgress);
}
+class ShardSplitRecipientCleanupTest : public ShardSplitDonorServiceTest {
+public:
+ void setUpPersistence(OperationContext* opCtx) override {
+
+ // We need to allow writes during the test's setup.
+ auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
+ repl::ReplicationCoordinator::get(opCtx->getServiceContext()));
+ replCoord->alwaysAllowWrites(true);
+
+ BSONArrayBuilder members;
+ members.append(BSON("_id" << 1 << "host"
+ << "node1"
+ << "tags" << BSON("recipientTagName" << UUID::gen().toString())));
+
+ auto newConfig = repl::ReplSetConfig::parse(BSON("_id" << _recipientSetName << "version"
+ << 1 << "protocolVersion" << 1
+ << "members" << members.arr()));
+ replCoord->setGetConfigReturnValue(newConfig);
+
+ auto stateDocument = defaultStateDocument();
+ stateDocument.setBlockTimestamp(Timestamp(1, 1));
+ stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
+
+ _recStateDoc = stateDocument;
+ uassertStatusOK(serverless::insertStateDoc(opCtx, stateDocument));
+
+ _pauseBeforeRecipientCleanupFp =
+ std::make_unique<FailPointEnableBlock>("pauseShardSplitBeforeRecipientCleanup");
+
+ _initialTimesEntered = _pauseBeforeRecipientCleanupFp->initialTimesEntered();
+ }
+
+protected:
+ ShardSplitDonorDocument _recStateDoc;
+ std::unique_ptr<FailPointEnableBlock> _pauseBeforeRecipientCleanupFp;
+ FailPoint::EntryCountT _initialTimesEntered;
+};
+
+TEST_F(ShardSplitRecipientCleanupTest, ShardSplitRecipientCleanup) {
+ auto opCtx = makeOperationContext();
+ test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
+
+ ASSERT_OK(serverless::getStateDocument(opCtx.get(), _uuid).getStatus());
+
+ auto decisionFuture = [&]() {
+ ASSERT(_pauseBeforeRecipientCleanupFp);
+ (*(_pauseBeforeRecipientCleanupFp.get()))->waitForTimesEntered(_initialTimesEntered + 1);
+
+ auto splitService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(ShardSplitDonorService::kServiceName);
+ auto optionalDonor = ShardSplitDonorService::DonorStateMachine::lookup(
+ opCtx.get(), splitService, BSON("_id" << _uuid));
+
+ ASSERT_TRUE(optionalDonor);
+ auto serviceInstance = optionalDonor.get();
+ ASSERT(serviceInstance.get());
+
+ _pauseBeforeRecipientCleanupFp.reset();
+
+ return serviceInstance->decisionFuture();
+ }();
+
+ auto result = decisionFuture.get();
+
+ // We set the promise before the future chain. State will stay kBlocking with no abort.
+ ASSERT(!result.abortReason);
+ ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kBlocking);
+
+ // deleted the local state doc so this should return NoMatchingDocument
+ ASSERT_EQ(serverless::getStateDocument(opCtx.get(), _uuid).getStatus().code(),
+ ErrorCodes::NoMatchingDocument);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp
index c67aa3b4aad..f2db7aaf4d1 100644
--- a/src/mongo/db/serverless/shard_split_utils.cpp
+++ b/src/mongo/db/serverless/shard_split_utils.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/ops/delete.h"
#include "mongo/db/repl/repl_set_config.h"
namespace mongo {
@@ -188,8 +189,11 @@ StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx,
}
BSONObj result;
- auto foundDoc = Helpers::findOne(
- opCtx, collection.getCollection(), BSON("_id" << shardSplitId), result, true);
+ auto foundDoc = Helpers::findOne(opCtx,
+ collection.getCollection(),
+ BSON(ShardSplitDonorDocument::kIdFieldName << shardSplitId),
+ result,
+ true);
if (!foundDoc) {
return Status(ErrorCodes::NoMatchingDocument,
@@ -207,6 +211,32 @@ StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx,
}
}
+StatusWith<bool> deleteStateDoc(OperationContext* opCtx, const UUID& shardSplitId) {
+ const auto nss = NamespaceString::kTenantSplitDonorsNamespace;
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+
+ if (!collection) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << nss.ns() << " does not exist");
+ }
+ auto query = BSON(ShardSplitDonorDocument::kIdFieldName << shardSplitId);
+ return writeConflictRetry(opCtx, "ShardSplitDonorDeleteStateDoc", nss.ns(), [&]() -> bool {
+ auto nDeleted =
+ deleteObjects(opCtx, collection.getCollection(), nss, query, true /* justOne */);
+ return nDeleted > 0;
+ });
+}
+
+bool shouldRemoveStateDocumentOnRecipient(OperationContext* opCtx,
+ const ShardSplitDonorDocument& stateDocument) {
+ if (!stateDocument.getRecipientSetName()) {
+ return false;
+ }
+ auto recipientSetName = *stateDocument.getRecipientSetName();
+ auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
+ return recipientSetName == config.getReplSetName() &&
+ stateDocument.getState() == ShardSplitDonorStateEnum::kBlocking;
+}
} // namespace serverless
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h
index 99060abfa5c..ec2f1becd58 100644
--- a/src/mongo/db/serverless/shard_split_utils.h
+++ b/src/mongo/db/serverless/shard_split_utils.h
@@ -86,6 +86,15 @@ Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st
*/
Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc);
+
+/**
+ * Deletes a state documents in the database for a recipient if the state is blocking at startup.
+ *
+ * Returns 'NamespaceNotFound' if no matching namespace is found. Returns true if the doc was
+ * removed.
+ */
+StatusWith<bool> deleteStateDoc(OperationContext* opCtx, const UUID& shardSplitId);
+
/**
* Returns the state doc matching the document with shardSplitId from the disk if it
* exists. Reads at "no" timestamp i.e, reading with the "latest" snapshot reflecting up to date
@@ -99,5 +108,14 @@ Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st
StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx,
const UUID& shardSplitId);
+/**
+ * Returns true if the state document should be removed for a shard split recipient which is based
+ * on having a local state doc in kBlocking state and having matching recipientSetName matching the
+ * config.replSetName.
+ */
+bool shouldRemoveStateDocumentOnRecipient(OperationContext* opCtx,
+ const ShardSplitDonorDocument& stateDocument);
+
+
} // namespace serverless
} // namespace mongo