diff options
author | Didier Nadeau <didier.nadeau@mongodb.com> | 2022-12-22 19:28:33 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-22 20:27:26 +0000 |
commit | 7f8a654e5dbb9c81a5faa5bfdf4cd8233f191053 (patch) | |
tree | a4feb49e9f2a93457944b83d4143058ab8dd3d87 /src/mongo/db/repl | |
parent | b29760cd7c8723790b448356b306599307c540ce (diff) | |
download | mongo-7f8a654e5dbb9c81a5faa5bfdf4cd8233f191053.tar.gz |
SERVER-71263 Create RTAB at the start of merge using the tenantIds
Diffstat (limited to 'src/mongo/db/repl')
7 files changed, 215 insertions, 72 deletions
diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp index 060baba872a..a3617372b5b 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.cpp +++ b/src/mongo/db/repl/tenant_file_importer_service.cpp @@ -95,16 +95,6 @@ void importCopiedFiles(OperationContext* opCtx, const UUID& migrationId) { }); auto metadatas = wiredTigerRollbackToStableAndGetMetadata(opCtx, tempWTDirectory.string()); - for (auto&& m : metadatas) { - const auto tenantId = parseTenantIdFromDB(m.ns.toStringWithTenantId()); - if (tenantId == boost::none) { - continue; - } - - LOGV2_DEBUG(6114100, 1, "Create recipient access blocker", "tenantId"_attr = tenantId); - addTenantMigrationRecipientAccessBlocker( - opCtx->getServiceContext(), *tenantId, migrationId); - } wiredTigerImportFromBackupCursor(opCtx, metadatas, tempWTDirectory.string()); diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index fc5e62197cf..bfe71cccaed 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -74,6 +74,14 @@ std::shared_ptr<TenantMigrationDonorAccessBlocker> getDonorAccessBlockerForMigra TenantMigrationAccessBlocker::BlockerType::kDonor)); } +std::shared_ptr<TenantMigrationRecipientAccessBlocker> getRecipientAccessBlockerForMigration( + ServiceContext* serviceContext, const UUID& migrationId) { + return checked_pointer_cast<TenantMigrationRecipientAccessBlocker>( + TenantMigrationAccessBlockerRegistry::get(serviceContext) + .getAccessBlockerForMigration(migrationId, + TenantMigrationAccessBlocker::BlockerType::kRecipient)); +} + std::shared_ptr<TenantMigrationDonorAccessBlocker> getTenantMigrationDonorAccessBlocker( ServiceContext* const serviceContext, StringData tenantId) { return checked_pointer_cast<TenantMigrationDonorAccessBlocker>( @@ -88,16 +96,6 @@ std::shared_ptr<TenantMigrationRecipientAccessBlocker> getTenantMigrationRecipie .getTenantMigrationAccessBlockerForTenantId(tenantId, MtabType::kRecipient)); } -void startRejectingReadsBefore(OperationContext* opCtx, mongo::Timestamp ts) { - auto callback = [&](std::string _, std::shared_ptr<TenantMigrationAccessBlocker>& mtab) { - auto recipientMtab = checked_pointer_cast<TenantMigrationRecipientAccessBlocker>(mtab); - recipientMtab->startRejectingReadsBefore(ts); - }; - - TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .applyAll(TenantMigrationAccessBlocker::BlockerType::kRecipient, callback); -} - void addTenantMigrationRecipientAccessBlocker(ServiceContext* serviceContext, const StringData& tenantId, const UUID& migrationId) { diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.h b/src/mongo/db/repl/tenant_migration_access_blocker_util.h index 33459f4beb9..84bb642b410 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.h +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.h @@ -43,6 +43,9 @@ namespace tenant_migration_access_blocker { std::shared_ptr<TenantMigrationDonorAccessBlocker> getDonorAccessBlockerForMigration( ServiceContext* serviceContext, const UUID& migrationId); +std::shared_ptr<TenantMigrationRecipientAccessBlocker> getRecipientAccessBlockerForMigration( + ServiceContext* serviceContext, const UUID& migrationId); + std::shared_ptr<TenantMigrationDonorAccessBlocker> getTenantMigrationDonorAccessBlocker( ServiceContext* serviceContext, StringData tenantId); @@ -50,11 +53,6 @@ std::shared_ptr<TenantMigrationRecipientAccessBlocker> getTenantMigrationRecipie ServiceContext* serviceContext, StringData tenantId); /** - * For "shard merge" protocol: tell all recipient access blockers to reject reads before ts. - */ -void startRejectingReadsBefore(OperationContext* opCtx, mongo::Timestamp ts); - -/** * Add an access blocker if one does not already exist. */ void addTenantMigrationRecipientAccessBlocker(ServiceContext* serviceContext, diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index 2c46e0ef34a..8d5cdc3d291 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -65,8 +65,10 @@ void onSetRejectReadsBeforeTimestamp(OperationContext* opCtx, invariant(mtab); mtab->startRejectingReadsBefore(recipientStateDoc.getRejectReadsBeforeTimestamp().value()); } else { - tenant_migration_access_blocker::startRejectingReadsBefore( - opCtx, recipientStateDoc.getRejectReadsBeforeTimestamp().get()); + auto mtab = tenant_migration_access_blocker::getRecipientAccessBlockerForMigration( + opCtx->getServiceContext(), recipientStateDoc.getId()); + invariant(mtab); + mtab->startRejectingReadsBefore(recipientStateDoc.getRejectReadsBeforeTimestamp().get()); } } @@ -119,6 +121,37 @@ void handleShardMergeStateChange(OperationContext* opCtx, break; } } + +void handleShardMergeDocInsertion(const TenantMigrationRecipientDocument& doc, + OperationContext* opCtx) { + switch (doc.getState()) { + case TenantMigrationRecipientStateEnum::kUninitialized: + case TenantMigrationRecipientStateEnum::kLearnedFilenames: + case TenantMigrationRecipientStateEnum::kConsistent: + uasserted(ErrorCodes::IllegalOperation, + str::stream() << "Inserting the TenantMigrationRecipient document in state " + << TenantMigrationRecipientState_serializer(doc.getState()) + << " is illegal"); + break; + case TenantMigrationRecipientStateEnum::kStarted: { + invariant(doc.getTenantIds()); + auto mtab = std::make_shared<TenantMigrationRecipientAccessBlocker>( + opCtx->getServiceContext(), doc.getId()); + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .add(*doc.getTenantIds(), mtab); + + opCtx->recoveryUnit()->onRollback([opCtx, migrationId = doc.getId()] { + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .removeAccessBlockersForMigration( + migrationId, TenantMigrationAccessBlocker::BlockerType::kRecipient); + }); + } break; + case TenantMigrationRecipientStateEnum::kDone: + break; + default: + MONGO_UNREACHABLE; + } +} } // namespace void TenantMigrationRecipientOpObserver::onCreateCollection(OperationContext* opCtx, @@ -128,50 +161,36 @@ void TenantMigrationRecipientOpObserver::onCreateCollection(OperationContext* op const BSONObj& idIndex, const OplogSlot& createOpTime, bool fromMigrate) { - if (shard_merge_utils::isDonatedFilesCollection(collectionName)) { - auto collString = collectionName.coll().toString(); - auto migrationUUID = - uassertStatusOK(UUID::parse(collString.substr(collString.find('.') + 1))); - auto fileClonerTempDirPath = shard_merge_utils::fileClonerTempDir(migrationUUID); - - // This is possible when a secondary restarts or rollback and the donated files collection - // is created as part of oplog replay. - if (boost::filesystem::exists(fileClonerTempDirPath)) { - LOGV2_DEBUG(6113316, - 1, - "File cloner temp directory already exists", - "directory"_attr = fileClonerTempDirPath.generic_string()); - - // Ignoring the errors because if this step fails, then the following step - // create_directory() will fail and that will throw an exception. - boost::system::error_code ec; - boost::filesystem::remove_all(fileClonerTempDirPath, ec); - } - - try { - boost::filesystem::create_directory(fileClonerTempDirPath); - } catch (std::exception& e) { - LOGV2_ERROR(6113317, - "Error creating file cloner temp directory", - "directory"_attr = fileClonerTempDirPath.generic_string(), - "error"_attr = e.what()); - throw; - } - } else if (!collectionName.isOnInternalDb()) { - const auto& recipientInfo = tenantMigrationInfo(opCtx); - if (!recipientInfo) - return; - - const auto tenantId = tenant_migration_access_blocker::parseTenantIdFromDB( - collectionName.dbName().toStringWithTenantId()); + if (!shard_merge_utils::isDonatedFilesCollection(collectionName)) { + return; + } - tassert( - 6461602, - "Unable to determine tenant id from provided database name"_format(collectionName.db()), - tenantId); + auto collString = collectionName.coll().toString(); + auto migrationUUID = uassertStatusOK(UUID::parse(collString.substr(collString.find('.') + 1))); + auto fileClonerTempDirPath = shard_merge_utils::fileClonerTempDir(migrationUUID); + + // This is possible when a secondary restarts or rollback and the donated files collection + // is created as part of oplog replay. + if (boost::filesystem::exists(fileClonerTempDirPath)) { + LOGV2_DEBUG(6113316, + 1, + "File cloner temp directory already exists", + "directory"_attr = fileClonerTempDirPath.generic_string()); + + // Ignoring the errors because if this step fails, then the following step + // create_directory() will fail and that will throw an exception. + boost::system::error_code ec; + boost::filesystem::remove_all(fileClonerTempDirPath, ec); + } - tenant_migration_access_blocker::addTenantMigrationRecipientAccessBlocker( - opCtx->getServiceContext(), tenantId.get(), recipientInfo->uuid); + try { + boost::filesystem::create_directory(fileClonerTempDirPath); + } catch (std::exception& e) { + LOGV2_ERROR(6113317, + "Error creating file cloner temp directory", + "directory"_attr = fileClonerTempDirPath.generic_string(), + "error"_attr = e.what()); + throw; } } @@ -191,6 +210,11 @@ void TenantMigrationRecipientOpObserver::onInserts( .acquireLock(ServerlessOperationLockRegistry::LockType::kTenantRecipient, recipientStateDoc.getId()); } + + if (auto protocol = recipientStateDoc.getProtocol().value_or(kDefaultMigrationProtocol); + protocol == MigrationProtocolEnum::kShardMerge) { + handleShardMergeDocInsertion(recipientStateDoc, opCtx); + } } } @@ -243,6 +267,8 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, recipientMtab->stopBlockingTTL(); }; + // TODO SERVER-68799 Simplify cleanup logic for shard merge as the tenants share a + // single RTAB TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .applyAll(TenantMigrationAccessBlocker::BlockerType::kRecipient, cleanUpBlockerIfGarbage); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 084b8101167..6215ac54e88 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -274,7 +274,6 @@ public: } }; - } // namespace TenantMigrationRecipientService::TenantMigrationRecipientService( @@ -389,6 +388,7 @@ TenantMigrationRecipientService::Instance::Instance( _stateDoc( TenantMigrationRecipientDocument::parse(IDLParserContext("recipientStateDoc"), stateDoc)), _tenantId(_stateDoc.getTenantId().toString()), + _tenantIds(_stateDoc.getTenantIds() ? *_stateDoc.getTenantIds() : std::vector<TenantId>()), _protocol(_stateDoc.getProtocol().value_or(MigrationProtocolEnum::kMultitenantMigrations)), _migrationUuid(_stateDoc.getId()), _donorConnectionString(_stateDoc.getDonorConnectionString().toString()), @@ -417,6 +417,9 @@ TenantMigrationRecipientService::Instance::Instance( return boost::none; } }()) { + + // Validate the provided tenantIds matches with the protocol + _validateTenantIdsForProtocol(); } boost::optional<BSONObj> TenantMigrationRecipientService::Instance::reportForCurrentOp( @@ -1574,6 +1577,23 @@ void TenantMigrationRecipientService::Instance::_createOplogBuffer(WithLock, } } +void TenantMigrationRecipientService::Instance::_validateTenantIdsForProtocol() { + switch (_protocol) { + case MigrationProtocolEnum::kMultitenantMigrations: + uassert(ErrorCodes::InvalidOptions, + "The field 'tenantIds' must not be set for protocol 'multitenant migration'", + _tenantIds.empty() && !_tenantId.empty()); + break; + case MigrationProtocolEnum::kShardMerge: + uassert(ErrorCodes::InvalidOptions, + "The field 'tenantIds' must be set and non-empty for protocol 'shard merge'", + _tenantId.empty() && !_tenantIds.empty()); + break; + default: + MONGO_UNREACHABLE; + } +} + SemiFuture<void> TenantMigrationRecipientService::Instance::_fetchRetryableWritesOplogBeforeStartOpTime() { _stopOrHangOnFailPoint(&fpAfterRetrievingStartOpTimesMigrationRecipientInstance); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 393c5c77448..7b538872e51 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -428,6 +428,12 @@ public: void _createOplogBuffer(WithLock, OperationContext* opCtx); /** + * Validates the tenantIds field is consistent with the protocol given. Throws an exception + * if there is a mismatch. + */ + void _validateTenantIdsForProtocol(); + + /** * Runs an aggregation that gets the entire oplog chain for every retryable write entry in * `config.transactions`. Only returns oplog entries in the chain where * `ts` < `startFetchingOpTime.ts` and adds them to the oplog buffer. @@ -645,6 +651,7 @@ public: // This data is provided in the initial state doc and never changes. We keep copies to // avoid having to obtain the mutex to access them. const std::string _tenantId; // (R) + const std::vector<TenantId> _tenantIds; // (R) const MigrationProtocolEnum _protocol; // (R) const UUID _migrationUuid; // (R) const std::string _donorConnectionString; // (R) diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp index be6f0bbc799..60815976aaa 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service_shard_merge_test.cpp @@ -50,7 +50,9 @@ #include "mongo/db/repl/primary_only_service_op_observer.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/repl/tenant_migration_access_blocker_registry.h" #include "mongo/db/repl/tenant_migration_recipient_entry_helpers.h" +#include "mongo/db/repl/tenant_migration_recipient_op_observer.h" #include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/wait_for_majority_service.h" @@ -175,6 +177,9 @@ public: opObserverRegistry->addObserver( std::make_unique<PrimaryOnlyServiceOpObserver>(serviceContext)); + // Add OpObserver needed by subclasses. + addOpObserver(opObserverRegistry); + _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); std::unique_ptr<TenantMigrationRecipientService> service = std::make_unique<TenantMigrationRecipientService>(getServiceContext()); @@ -280,6 +285,10 @@ protected: size_t _numSecondaryIndexesCreated{0}; size_t _numDocsInserted{0}; + const TenantId _tenantA{OID::gen()}; + const TenantId _tenantB{OID::gen()}; + const std::vector<TenantId> _tenants{_tenantA, _tenantB}; + const TenantMigrationPEMPayload kRecipientPEMPayload = [&] { std::ifstream infile("jstests/libs/client.pem"); std::string buf((std::istreambuf_iterator<char>(infile)), std::istreambuf_iterator<char>()); @@ -372,6 +381,8 @@ protected: } private: + virtual void addOpObserver(OpObserverRegistry* opObserverRegistry){}; + ClockSourceMock _clkSource; unittest::MinimumLoggedSeverityGuard _replicationSeverityGuard{ @@ -438,6 +449,97 @@ BSONObj createServerAggregateReply() { .toBSONAsInitialResponse(); } +/** + * This class adds the TenantMigrationRecipientOpObserver to the main test fixture class. It cannot + * be used in tests after insertion of the state document because the OpObserver uses + * TenantFileImporter service when the state document is updated. This importer is not mocked + * currently and does not work with unit tests as it creates its own thread. + */ +class TenantMigrationRecipientServiceShardMergeTestInsert + : public TenantMigrationRecipientServiceShardMergeTest { +private: + void addOpObserver(OpObserverRegistry* opObserverRegistry) { + opObserverRegistry->addObserver(std::make_unique<TenantMigrationRecipientOpObserver>()); + } +}; + +TEST_F(TenantMigrationRecipientServiceShardMergeTestInsert, + TestBlockersAreInsertedWhenInsertingStateDocument) { + stopFailPointEnableBlock fp("fpBeforeFetchingDonorClusterTimeKeys"); + const UUID migrationUUID = UUID::gen(); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + getTopologyManager()->setTopologyDescription(replSet.getTopologyDescription(clock())); + insertTopOfOplog(&replSet, OpTime(Timestamp(5, 1), 1)); + + // Mock the aggregate response from the donor. + MockRemoteDBServer* const _donorServer = + mongo::MockConnRegistry::get()->getMockRemoteDBServer(replSet.getPrimary()); + _donorServer->setCommandReply("aggregate", createServerAggregateReply()); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setProtocol(MigrationProtocolEnum::kShardMerge); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + initialStateDocument.setTenantIds(_tenants); + + auto opCtx = makeOperationContext(); + std::shared_ptr<TenantMigrationRecipientService::Instance> instance; + { + auto fp = globalFailPointRegistry().find( + "fpAfterPersistingTenantMigrationRecipientInstanceStateDoc"); + auto initialTimesEntered = fp->setMode(FailPoint::alwaysOn, + 0, + BSON("action" + << "hang")); + instance = TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()); + ASSERT(instance.get()); + + fp->waitForTimesEntered(initialTimesEntered + 1); + + // Test that access blocker exists. + for (const auto& tenantId : _tenants) { + auto blocker = + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .getTenantMigrationAccessBlockerForTenantId( + tenantId.toString(), TenantMigrationAccessBlocker::BlockerType::kRecipient); + ASSERT(!!blocker); + } + fp->setMode(FailPoint::off); + } + + ASSERT_EQ(stopFailPointErrorCode, instance->getDataSyncCompletionFuture().getNoThrow().code()); + ASSERT_OK(instance->getForgetMigrationDurableFuture().getNoThrow()); +} + +TEST_F(TenantMigrationRecipientServiceShardMergeTest, CannotCreateServiceWithoutTenants) { + const UUID migrationUUID = UUID::gen(); + const NamespaceString aggregateNs = NamespaceString("admin.$cmd.aggregate"); + + MockReplicaSet replSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); + + TenantMigrationRecipientDocument initialStateDocument( + migrationUUID, + replSet.getConnectionString(), + "", + kDefaultStartMigrationTimestamp, + ReadPreferenceSetting(ReadPreference::PrimaryOnly)); + initialStateDocument.setProtocol(MigrationProtocolEnum::kShardMerge); + initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + + auto opCtx = makeOperationContext(); + + ASSERT_THROWS_CODE(TenantMigrationRecipientService::Instance::getOrCreate( + opCtx.get(), _service, initialStateDocument.toBSON()), + DBException, + ErrorCodes::InvalidOptions); +} + TEST_F(TenantMigrationRecipientServiceShardMergeTest, OpenBackupCursorSuccessfully) { stopFailPointEnableBlock fp("fpBeforeAdvancingStableTimestamp"); const UUID migrationUUID = UUID::gen(); @@ -459,11 +561,12 @@ TEST_F(TenantMigrationRecipientServiceShardMergeTest, OpenBackupCursorSuccessful TenantMigrationRecipientDocument initialStateDocument( migrationUUID, replSet.getConnectionString(), - "tenantA", + "", kDefaultStartMigrationTimestamp, ReadPreferenceSetting(ReadPreference::PrimaryOnly)); initialStateDocument.setProtocol(MigrationProtocolEnum::kShardMerge); initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + initialStateDocument.setTenantIds(_tenants); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; @@ -527,11 +630,12 @@ TEST_F(TenantMigrationRecipientServiceShardMergeTest, OpenBackupCursorAndRetries TenantMigrationRecipientDocument initialStateDocument( migrationUUID, replSet.getConnectionString(), - "tenantA", + "", kDefaultStartMigrationTimestamp, ReadPreferenceSetting(ReadPreference::PrimaryOnly)); initialStateDocument.setProtocol(MigrationProtocolEnum::kShardMerge); initialStateDocument.setRecipientCertificateForDonor(kRecipientPEMPayload); + initialStateDocument.setTenantIds(_tenants); auto opCtx = makeOperationContext(); std::shared_ptr<TenantMigrationRecipientService::Instance> instance; |