summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormathisbessamdb <mathis.bessa@mongodb.com>2022-10-20 14:47:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-01 14:34:49 +0000
commit21c50cca8176fa618aa45878eca007e6fb50b31f (patch)
treeb73091c95c6d3686a2a7b3ca680f4fe3249bafee
parenta4c2c33a3154c86ec245556f1122091f87d407de (diff)
downloadmongo-21c50cca8176fa618aa45878eca007e6fb50b31f.tar.gz
SERVER-67926 Delete non-existing garbage collectable tenant migration data should not cause a ConflictingInProgress error
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp32
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp134
2 files changed, 161 insertions, 5 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 7f3b1e2474f..424cb73f3a5 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -179,6 +179,8 @@ MONGO_FAIL_POINT_DEFINE(fpWaitUntilTimestampMajorityCommitted);
MONGO_FAIL_POINT_DEFINE(hangAfterUpdatingTransactionEntry);
MONGO_FAIL_POINT_DEFINE(fpBeforeAdvancingStableTimestamp);
MONGO_FAIL_POINT_DEFINE(hangMigrationBeforeRetryCheck);
+MONGO_FAIL_POINT_DEFINE(skipCreatingIndexDuringRebuildService);
+MONGO_FAIL_POINT_DEFINE(pauseTenantMigrationRecipientInstanceBeforeDeletingOldStateDoc);
namespace {
// We never restart just the oplog fetcher. If a failure occurs, we restart the whole state machine
@@ -297,6 +299,9 @@ void TenantMigrationRecipientService::abortAllMigrations(OperationContext* opCtx
ExecutorFuture<void> TenantMigrationRecipientService::_rebuildService(
std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) {
return AsyncTry([this] {
+ if (MONGO_unlikely(skipCreatingIndexDuringRebuildService.shouldFail())) {
+ return;
+ }
auto nss = getStateDocumentsNS();
AllowOpCtxWhenServiceRebuildingBlock allowOpCtxBlock(Client::getCurrent());
@@ -2742,15 +2747,32 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
// Otherwise, there is a real conflict so we should throw
// ConflictingInProgress.
lk.unlock();
- auto deleted =
- uassertStatusOK(tenantMigrationRecipientEntryHelpers::
- deleteStateDocIfMarkedAsGarbageCollectable(
- opCtx.get(), _tenantId));
+
+ auto existingStateDoc =
+ tenantMigrationRecipientEntryHelpers::getStateDoc(
+ opCtx.get(), mtab->getMigrationId());
+ uassertStatusOK(existingStateDoc.getStatus());
+
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream()
<< "Found active migration for tenantId \"" << _tenantId
<< "\" with migration id " << mtab->getMigrationId(),
- deleted);
+ existingStateDoc.getValue().getExpireAt());
+
+ pauseTenantMigrationRecipientInstanceBeforeDeletingOldStateDoc
+ .pauseWhileSet();
+
+ auto deleted =
+ uassertStatusOK(tenantMigrationRecipientEntryHelpers::
+ deleteStateDocIfMarkedAsGarbageCollectable(
+ opCtx.get(), _tenantId));
+ // The doc has an expireAt but was deleted before we had time to delete
+ // it above therefore it's safe to pursue since it has been cleaned up.
+ if (!deleted) {
+ LOGV2_WARNING(6792601,
+ "Existing state document was deleted before we could "
+ "delete it ourselves.");
+ }
lk.lock();
}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index acc80047451..4e5c4dd9556 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -50,6 +50,8 @@
#include "mongo/db/repl/primary_only_service_op_observer.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/repl/tenant_migration_access_blocker_registry.h"
+#include "mongo/db/repl/tenant_migration_recipient_access_blocker.h"
#include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h"
#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
@@ -3777,6 +3779,138 @@ TEST_F(TenantMigrationRecipientServiceTest,
checkStateDocPersisted(opCtx.get(), instance.get());
}
+TEST_F(TenantMigrationRecipientServiceTest,
+ RecipientDeletesExistingStateDocMarkedForGarbageCollection) {
+ FailPointEnableBlock createIndexesFailpointBlock("skipCreatingIndexDuringRebuildService");
+ stopFailPointEnableBlock fp("fpAfterPersistingTenantMigrationRecipientInstanceStateDoc");
+ auto beforeDeleteFp = globalFailPointRegistry().find(
+ "pauseTenantMigrationRecipientInstanceBeforeDeletingOldStateDoc");
+ auto initialTimesEntered = beforeDeleteFp->setMode(FailPoint::alwaysOn);
+ auto opCtx = makeOperationContext();
+
+ // Insert a state doc to simulate running a migration with an existing state doc NOT marked for
+ // garbage collection.
+ const std::string kTenantId = "tenantA";
+ const std::string kConnectionString = "donor-rs/localhost:12345";
+ const UUID existingMigrationId = UUID::gen();
+ TenantMigrationRecipientDocument previousStateDoc(
+ existingMigrationId,
+ kConnectionString,
+ kTenantId,
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ previousStateDoc.setProtocol(MigrationProtocolEnum::kMultitenantMigrations);
+ previousStateDoc.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Starting a migration where the state is not 'kUninitialized' indicates that we are restarting
+ // from failover.
+ previousStateDoc.setState(TenantMigrationRecipientStateEnum::kStarted);
+ // Set the 'expireAt' field to indicate the migration is garbage collectable.
+ previousStateDoc.setExpireAt(opCtx->getServiceContext()->getFastClockSource()->now());
+
+ // Insert existing state document for the same tenant but different migration id.
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx.get(), previousStateDoc));
+
+ // Create the tenant access blockers for the stateDoc with the associated tenantId and
+ // migrationId.
+ auto recipientMtab = std::make_shared<TenantMigrationRecipientAccessBlocker>(
+ opCtx->getServiceContext(), existingMigrationId);
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .add(kTenantId, recipientMtab);
+
+ const UUID migrationUUID = UUID::gen();
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ kConnectionString,
+ kTenantId,
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet::primaryOnly()));
+ initialStateDocument.setProtocol(MigrationProtocolEnum::kMultitenantMigrations);
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Create and start the instance.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
+
+ // We block and wait right before the service deletes the previous state document.
+ beforeDeleteFp->waitForTimesEntered(initialTimesEntered + 1);
+
+ // Delete state doc while we are expecting to delete it ourselves.
+ auto deleted = uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::deleteStateDocIfMarkedAsGarbageCollectable(
+ opCtx.get(), kTenantId));
+
+ // Successfully deletes the old state document before the service deletes it itself.
+ ASSERT_TRUE(deleted);
+
+ beforeDeleteFp->setMode(FailPoint::off);
+
+ // Wait for task completion. We should not get an error since the state doc was already deleted.
+ ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow());
+}
+
+TEST_F(TenantMigrationRecipientServiceTest, RecipientFailsDueToOperationConflict) {
+ FailPointEnableBlock createIndexesFailpointBlock("skipCreatingIndexDuringRebuildService");
+ stopFailPointEnableBlock fp("fpAfterPersistingTenantMigrationRecipientInstanceStateDoc");
+
+ // Insert a state doc to simulate running a migration with an existing state doc NOT marked for
+ // garbage collection.
+ const std::string kTenantId = "tenantA";
+ const std::string kConnectionString = "donor-rs/localhost:12345";
+ const UUID existingMigrationId = UUID::gen();
+ TenantMigrationRecipientDocument previousStateDoc(
+ existingMigrationId,
+ kConnectionString,
+ kTenantId,
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly));
+ previousStateDoc.setProtocol(MigrationProtocolEnum::kMultitenantMigrations);
+ previousStateDoc.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Starting a migration where the state is not 'kUninitialized' indicates that we are restarting
+ // from failover.
+ previousStateDoc.setState(TenantMigrationRecipientStateEnum::kStarted);
+
+ auto opCtx = makeOperationContext();
+
+ // Insert existing state document for the same tenant but different migration id
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::insertStateDoc(opCtx.get(), previousStateDoc));
+
+ // Create the tenant access blockers for the stateDoc with the associated tenantId and
+ // migrationId.
+ auto recipientMtab = std::make_shared<TenantMigrationRecipientAccessBlocker>(
+ opCtx->getServiceContext(), existingMigrationId);
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .add(kTenantId, recipientMtab);
+
+ const UUID migrationUUID = UUID::gen();
+ TenantMigrationRecipientDocument initialStateDocument(
+ migrationUUID,
+ kConnectionString,
+ kTenantId,
+ kDefaultStartMigrationTimestamp,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly, TagSet::primaryOnly()));
+ initialStateDocument.setProtocol(MigrationProtocolEnum::kMultitenantMigrations);
+ initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload);
+
+ // Create and start the instance.
+ auto instance = TenantMigrationRecipientService::Instance::getOrCreate(
+ opCtx.get(), _service, initialStateDocument.toBSON());
+ ASSERT(instance.get());
+ ASSERT_EQ(migrationUUID, instance->getMigrationUUID());
+
+ // Since the previous state doc did not have expireAt set we will assert with
+ // ConflictingOperationInProgress.
+ ASSERT_EQ(ErrorCodes::ConflictingOperationInProgress,
+ instance->getDataSyncCompletionFuture().getNoThrow().code());
+ ASSERT_EQ(instance->getForgetMigrationDurableFuture().getNoThrow(),
+ ErrorCodes::ConflictingOperationInProgress);
+}
#endif
} // namespace repl
} // namespace mongo