diff options
author | Christopher Caplinger <christopher.caplinger@mongodb.com> | 2022-06-23 15:58:45 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-23 16:34:43 +0000 |
commit | 0642839041dcab4ec72395259544c498cc2e3dc5 (patch) | |
tree | 1a40d46f90f94159e49ac6ce2a4175d374acf5bf | |
parent | a06bc8bbced8f0c60b94ed784f5f105f2f01ed5d (diff) | |
download | mongo-0642839041dcab4ec72395259544c498cc2e3dc5.tar.gz |
SERVER-63789 Async file copy/import
6 files changed, 280 insertions, 257 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js index b12512b2b8d..40d73eb5884 100644 --- a/jstests/replsets/libs/tenant_migration_test.js +++ b/jstests/replsets/libs/tenant_migration_test.js @@ -343,6 +343,10 @@ function TenantMigrationTest({ donorNodes = donorNodes || donorRst.nodes; recipientNodes = recipientNodes || recipientRst.nodes; + if (typeof migrationId === "string") { + migrationId = UUID(migrationId); + } + donorNodes.forEach(node => { const configDonorsColl = node.getCollection(TenantMigrationTest.kConfigDonorsNS); assert.soon(() => 0 === configDonorsColl.count({_id: migrationId}), tojson(node)); diff --git a/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js index d5055bb0337..c4ca54443af 100644 --- a/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js +++ b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js @@ -34,21 +34,15 @@ if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { return; } -if (TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { - // TODO SERVER-63789: Re-enable this test for Shard Merge - tenantMigrationTest.stop(); - jsTestLog("Temporarily skipping Shard Merge test, dependent on SERVER-63789."); - return; -} - const kDataDir = `${recipientPrimary.dbpath}/migrationTmpFiles.${extractUUIDFromObject(migrationId)}`; assert.eq(runNonMongoProgram("mkdir", "-p", kDataDir), 0); +const dbName = "myDatabase"; + (function() { jsTestLog("Generate test data"); -const dbName = "myDatabase"; const db = donorPrimary.getDB(dbName); const collection = db["myCollection"]; const capped = db["myCappedCollection"]; @@ -69,7 +63,6 @@ configureFailPoint( recipientPrimary, "WTWriteConflictExceptionForImportCollection", {} /* data */, {times: 1}); configureFailPoint( recipientPrimary, "WTWriteConflictExceptionForImportIndex", {} /* data */, {times: 1}); -configureFailPoint(recipientPrimary, "skipDeleteTempDBPath"); jsTestLog("Run migration"); // The old multitenant migrations won't copy myDatabase since it doesn't start with testTenantId, diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp index 9d1076eb179..af565c3c713 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.cpp +++ b/src/mongo/db/repl/tenant_file_importer_service.cpp @@ -34,6 +34,7 @@ #include <fmt/format.h> #include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/repl/replication_coordinator.h" @@ -54,15 +55,8 @@ namespace mongo::repl { using namespace fmt::literals; using namespace shard_merge_utils; using namespace tenant_migration_access_blocker; -using executor::NetworkInterface; -using executor::NetworkInterfaceThreadPool; -using executor::TaskExecutor; -using executor::ThreadPoolTaskExecutor; namespace { - -MONGO_FAIL_POINT_DEFINE(skipDeleteTempDBPath); - const auto _TenantFileImporterService = ServiceContext::declareDecoration<TenantFileImporterService>(); @@ -70,7 +64,7 @@ const ReplicaSetAwareServiceRegistry::Registerer<TenantFileImporterService> _TenantFileImporterServiceRegisterer("TenantFileImporterService"); void importCopiedFiles(OperationContext* opCtx, - const UUID& migrationId, + UUID& migrationId, const StringData& donorConnectionString) { auto tempWTDirectory = fileClonerTempDir(migrationId); uassert(6113315, @@ -80,12 +74,6 @@ void importCopiedFiles(OperationContext* opCtx, // TODO SERVER-63204: Evaluate correct place to remove the temporary WT dbpath. ON_BLOCK_EXIT([&tempWTDirectory, &migrationId] { - // TODO SERVER-63789: Delete skipDeleteTempDBPath failpoint - if (MONGO_unlikely(skipDeleteTempDBPath.shouldFail())) { - LOGV2(6114402, - "skipDeleteTempDBPath failpoint enabled, skipping temp directory cleanup."); - return; - } LOGV2_INFO(6113324, "Done importing files, removing the temporary WT dbpath", "migrationId"_attr = migrationId, @@ -113,6 +101,7 @@ void importCopiedFiles(OperationContext* opCtx, auto catalog = CollectionCatalog::get(opCtx); for (auto&& m : metadatas) { + AutoGetDb dbLock(opCtx, m.ns.db(), MODE_IX); Lock::CollectionLock systemViewsLock( opCtx, NamespaceString(m.ns.dbName(), NamespaceString::kSystemDotViewsCollectionName), @@ -126,142 +115,205 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic return &_TenantFileImporterService(serviceContext); } -void TenantFileImporterService::onStartup(OperationContext*) { - auto net = executor::makeNetworkInterface("TenantFileImporterService-TaskExecutor"); - auto pool = std::make_unique<executor::NetworkInterfaceThreadPool>(net.get()); - _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); - _executor->startup(); -} - void TenantFileImporterService::startMigration(const UUID& migrationId, const StringData& donorConnectionString) { stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kStarted && _state < State::kInterrupted) { + return; + } + _reset(lk); _migrationId = migrationId; _donorConnectionString = donorConnectionString.toString(); - _scopedExecutor = std::make_shared<executor::ScopedTaskExecutor>( - _executor, - Status{ErrorCodes::CallbackCanceled, "TenantFileImporterService executor cancelled"}); - _state.setState(ImporterState::State::kCopyingFiles); + _eventQueue = std::make_shared<Queue>(); + _state = State::kStarted; + + _thread = std::make_unique<stdx::thread>([this, migrationId] { + Client::initThread("TenantFileImporterService"); + LOGV2_INFO(6378904, + "TenantFileImporterService starting worker thread", + "migrationId"_attr = migrationId.toString()); + auto opCtx = cc().makeOperationContext(); + _handleEvents(opCtx.get()); + }); } void TenantFileImporterService::learnedFilename(const UUID& migrationId, const BSONObj& metadataDoc) { - auto opCtx = cc().getOperationContext(); - { - stdx::lock_guard lk(_mutex); - uassert(8423347, - "Called learnedFilename with migrationId {}, but {} is active"_format( - migrationId.toString(), _migrationId ? _migrationId->toString() : "(null)"), - migrationId == _migrationId); + stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) { + return; } - try { - // TODO (SERVER-62734): Do this work asynchronously on the executor. - cloneFile(opCtx, metadataDoc); - } catch (const DBException& ex) { - LOGV2_ERROR(6229306, - "Error cloning files", - "migrationUUID"_attr = migrationId, - "error"_attr = ex.toStatus()); - // TODO (SERVER-63390): On error, vote shard merge abort to recipient primary. - } + tassert(8423347, + "Called learnedFilename with migrationId {}, but {} is active"_format( + migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"), + migrationId == _migrationId); + + _state = State::kLearnedFilename; + ImporterEvent event{ImporterEvent::Type::kLearnedFileName, migrationId}; + event.metadataDoc = metadataDoc.getOwned(); + invariant(_eventQueue); + auto success = _eventQueue->tryPush(std::move(event)); + + uassert(6378903, + "TenantFileImporterService failed to push '{}' event without blocking"_format( + stateToString(_state)), + success); } void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) { + stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) { + return; + } + + tassert(8423345, + "Called learnedAllFilenames with migrationId {}, but {} is active"_format( + migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"), + migrationId == _migrationId); + + _state = State::kLearnedAllFilenames; + invariant(_eventQueue); + auto success = _eventQueue->tryPush({ImporterEvent::Type::kLearnedAllFilenames, migrationId}); + uassert(6378902, + "TenantFileImporterService failed to push '{}' event without blocking"_format( + stateToString(_state)), + success); +} + +void TenantFileImporterService::interrupt(const UUID& migrationId) { + stdx::lock_guard lk(_mutex); + if (migrationId != _migrationId) { + LOGV2_WARNING( + 6378901, + "Called interrupt with migrationId {migrationId}, but {activeMigrationId} is active", + "migrationId"_attr = migrationId.toString(), + "activeMigrationId"_attr = _migrationId ? _migrationId->toString() : "no migration"); + return; + } + _interrupt(lk); +} + +void TenantFileImporterService::interruptAll() { + stdx::lock_guard lk(_mutex); + if (!_migrationId) { + return; + } + _interrupt(lk); +} + +void TenantFileImporterService::_handleEvents(OperationContext* opCtx) { + using eventType = ImporterEvent::Type; + std::string donorConnectionString; + boost::optional<UUID> migrationId; + + std::shared_ptr<Queue> eventQueueRef; { stdx::lock_guard lk(_mutex); - if (!_state.is(ImporterState::State::kCopyingFiles)) { - return; + invariant(_eventQueue); + eventQueueRef = _eventQueue; + donorConnectionString = _donorConnectionString; + migrationId = _migrationId; + } + + ImporterEvent event{eventType::kNone, UUID::gen()}; + while (true) { + opCtx->checkForInterrupt(); + + try { + event = eventQueueRef->pop(opCtx); + } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>& err) { + LOGV2_WARNING(6378900, "Event queue was interrupted", "error"_attr = err); + break; } - uassert(8423345, - "Called learnedAllFilenames with migrationId {}, but {} is active"_format( - migrationId.toString(), _migrationId ? _migrationId->toString() : "(null)"), - migrationId == _migrationId); + // Out-of-order events for a different migration are not permitted. + invariant(event.migrationId == migrationId); + + switch (event.type) { + case eventType::kNone: + continue; + case eventType::kLearnedFileName: + cloneFile(opCtx, event.metadataDoc); + continue; + case eventType::kLearnedAllFilenames: + importCopiedFiles(opCtx, event.migrationId, donorConnectionString); + _voteImportedFiles(opCtx); + break; + } + break; + } +} - _state.setState(ImporterState::State::kCopiedFiles); - donorConnectionString = _donorConnectionString; +void TenantFileImporterService::_voteImportedFiles(OperationContext* opCtx) { + boost::optional<UUID> migrationId; + { + stdx::lock_guard lk(_mutex); + migrationId = _migrationId; } + invariant(migrationId); + + auto replCoord = ReplicationCoordinator::get(getGlobalServiceContext()); - auto opCtx = cc().getOperationContext(); - // TODO SERVER-63789: Revisit use of AllowLockAcquisitionOnTimestampedUnitOfWork and - // remove if possible. - // No other threads will try to acquire conflicting locks: we are acquiring - // database/collection locks for new tenants. - AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); + RecipientVoteImportedFiles cmd(*migrationId, replCoord->getMyHostAndPort(), true /* success */); - importCopiedFiles(opCtx, migrationId, donorConnectionString); + auto voteResponse = replCoord->runCmdOnPrimaryAndAwaitResponse( + opCtx, + NamespaceString::kAdminDb.toString(), + cmd.toBSON({}), + [](executor::TaskExecutor::CallbackHandle handle) {}, + [](executor::TaskExecutor::CallbackHandle handle) {}); - // TODO (SERVER-62734): Keep count of files remaining to import, wait before voting. - stdx::lock_guard lk(_mutex); - if (!_state.is(ImporterState::State::kCopiedFiles) || migrationId != _migrationId) { - LOGV2_INFO(6114103, - "Not calling recipientVoteImportedFiles: migration ended", - "currentMigrationId"_attr = _migrationId, - "previousMigrationId"_attr = migrationId); - return; + auto voteStatus = getStatusFromCommandResult(voteResponse); + if (!voteStatus.isOK()) { + LOGV2_WARNING(6113403, + "Failed to run recipientVoteImportedFiles command on primary", + "status"_attr = voteStatus); + // TODO SERVER-64192: handle this case, retry, and/or throw error, etc. } - _voteImportedFiles(migrationId, lk); - _state.setState(ImporterState::State::kImportedFiles); } -void TenantFileImporterService::reset(const UUID& migrationId) { - stdx::lock_guard lk(_mutex); - if (migrationId != _migrationId) { - LOGV2_DEBUG(6114106, - 1, - "Ignoring reset for unknown migrationId", - "currentMigrationId"_attr = _migrationId, - "unknownMigrationId"_attr = migrationId); +void TenantFileImporterService::_interrupt(WithLock) { + if (_state == State::kInterrupted) { return; } - _reset(lk); -} -void TenantFileImporterService::_voteImportedFiles(const UUID& migrationId, WithLock) { - auto replCoord = ReplicationCoordinator::get(getGlobalServiceContext()); - // Call the command on the primary (which is self if this node is primary). - auto primary = replCoord->getCurrentPrimaryHostAndPort(); - if (primary.empty()) { - LOGV2_WARNING( - 6113406, - "No primary for recipientVoteImportedFiles command, cannot continue migration", - "migrationId"_attr = migrationId); - return; + // TODO SERVER-66150: interrupt the tenant file cloner by closing the dbClientConnnection via + // shutdownAndDisallowReconnect() and shutting down the writer pool. + if (_eventQueue) { + _eventQueue->closeConsumerEnd(); } - RecipientVoteImportedFiles cmd(migrationId, replCoord->getMyHostAndPort(), true /* success */); - executor::RemoteCommandRequest request(primary, "admin", cmd.toBSON({}), nullptr); - request.sslMode = transport::kGlobalSSLMode; - auto scheduleResult = - (*_scopedExecutor) - ->scheduleRemoteCommand( - request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - if (!args.response.isOK()) { - LOGV2_WARNING(6113405, - "recipientVoteImportedFiles command failed", - "error"_attr = redact(args.response.status)); - return; - } - auto status = getStatusFromCommandResult(args.response.data); - if (!status.isOK()) { - LOGV2_WARNING(6113404, - "recipientVoteImportedFiles command failed", - "error"_attr = redact(status)); - } - }); - if (!scheduleResult.isOK()) { - LOGV2_WARNING(6113403, - "Failed to schedule recipientVoteImportedFiles command on primary", - "status"_attr = scheduleResult.getStatus()); + { + // TODO SERVER-66907: Uncomment op ctx interrupt logic. + // OperationContext* ptr = _opCtx.get(); + // stdx::lock_guard<Client> lk(*ptr->getClient()); + // _opCtx->markKilled(ErrorCodes::Interrupted); } + + _state = State::kInterrupted; } void TenantFileImporterService::_reset(WithLock) { - _scopedExecutor.reset(); // Shuts down and joins the executor. - _migrationId.reset(); - _state.setState(ImporterState::State::kUninitialized); + if (_migrationId) { + LOGV2_INFO(6378905, + "TenantFileImporterService resetting migration", + "migrationId"_attr = _migrationId->toString()); + _migrationId.reset(); + } + + if (_thread && _thread->joinable()) { + _thread->join(); + _thread.reset(); + } + + if (_eventQueue) { + _eventQueue.reset(); + } + + // TODO SERVER-66907: how should we be resetting _opCtx? + _state = State::kUninitialized; } } // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h index 92aa2baa426..d7188f9a0e6 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.h +++ b/src/mongo/db/repl/tenant_file_importer_service.h @@ -33,9 +33,8 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/replica_set_aware_service.h" -#include "mongo/executor/scoped_task_executor.h" -#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/producer_consumer_queue.h" #include "mongo/util/string_map.h" #include "mongo/util/uuid.h" @@ -49,107 +48,84 @@ public: void startMigration(const UUID& migrationId, const StringData& donorConnectionString); void learnedFilename(const UUID& migrationId, const BSONObj& metadataDoc); void learnedAllFilenames(const UUID& migrationId); - void reset(const UUID& migrationId); + void interrupt(const UUID& migrationId); + void interruptAll(); private: - void onStartup(OperationContext* opCtx) final; - - void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) final {} + void onInitialDataAvailable(OperationContext*, bool) final {} void onShutdown() final { - stdx::lock_guard lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); + _interrupt(lk); _reset(lk); } - void onStepUpBegin(OperationContext* opCtx, long long term) final { - stdx::lock_guard lk(_mutex); - _reset(lk); - } + void onStartup(OperationContext*) final {} - void onStepUpComplete(OperationContext* opCtx, long long term) final { - stdx::lock_guard lk(_mutex); - _reset(lk); - } + void onStepUpBegin(OperationContext*, long long) final {} - void onStepDown() final { - stdx::lock_guard lk(_mutex); - _reset(lk); - } + void onStepUpComplete(OperationContext*, long long) final {} - void onBecomeArbiter() final { - stdx::lock_guard lk(_mutex); - _reset(lk); - } + void onStepDown() final {} + + void onBecomeArbiter() final {} + + void _handleEvents(OperationContext* opCtx); - void _voteImportedFiles(const UUID& migrationId, WithLock); + void _voteImportedFiles(OperationContext* opCtx); + + void _interrupt(WithLock); void _reset(WithLock); - // Lasts for the lifetime of the process. - std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; - // Wraps _executor. Created by learnedFilename and destroyed by _reset. - std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; + std::unique_ptr<stdx::thread> _thread; boost::optional<UUID> _migrationId; std::string _donorConnectionString; Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex"); - class ImporterState { - public: - enum class State { kUninitialized, kCopyingFiles, kCopiedFiles, kImportedFiles }; - - void setState(State nextState) { - tassert(6114403, - str::stream() << "current state: " << toString(_state) - << ", new state: " << toString(nextState), - isValidTransition(nextState)); - _state = nextState; - } - - bool is(State state) const { - return _state == state; - } + // Explicit State enum ordering defined here because we rely on comparison + // operators for state checking in various TenantFileImporterService methods. + enum class State { + kUninitialized = 0, + kStarted = 1, + kLearnedFilename = 2, + kLearnedAllFilenames = 3, + kInterrupted = 4 + }; - StringData toString() const { - return toString(_state); + static StringData stateToString(State state) { + switch (state) { + case State::kUninitialized: + return "uninitialized"; + case State::kStarted: + return "started"; + case State::kLearnedFilename: + return "learned filename"; + case State::kLearnedAllFilenames: + return "learned all filenames"; + case State::kInterrupted: + return "interrupted"; } + MONGO_UNREACHABLE; + return StringData(); + } - private: - static StringData toString(State value) { - switch (value) { - case State::kUninitialized: - return "uninitialized"; - case State::kCopyingFiles: - return "copying files"; - case State::kCopiedFiles: - return "copied files"; - case State::kImportedFiles: - return "imported files"; - } - MONGO_UNREACHABLE; - return StringData(); - } + State _state; - bool isValidTransition(State newState) { - if (_state == newState) { - return true; - } - - switch (_state) { - case State::kUninitialized: - return newState == State::kCopyingFiles; - case State::kCopyingFiles: - return newState == State::kCopiedFiles || newState == State::kUninitialized; - case State::kCopiedFiles: - return newState == State::kImportedFiles || newState == State::kUninitialized; - case State::kImportedFiles: - return newState == State::kUninitialized; - } - MONGO_UNREACHABLE; - } + struct ImporterEvent { + enum class Type { kNone, kLearnedFileName, kLearnedAllFilenames }; + Type type; + UUID migrationId; + BSONObj metadataDoc; - State _state = State::kUninitialized; + ImporterEvent(Type _type, const UUID& _migrationId) + : type(_type), migrationId(_migrationId) {} }; - ImporterState _state; + using Queue = + MultiProducerSingleConsumerQueue<ImporterEvent, + producer_consumer_queue_detail::DefaultCostFunction>; + + std::shared_ptr<Queue> _eventQueue; }; } // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index 26f8d75ba3c..4cfdb60b43c 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -155,17 +155,12 @@ void TenantMigrationRecipientOpObserver::onInserts( return; } - try { - auto fileImporter = repl::TenantFileImporterService::get(opCtx->getServiceContext()); - for (auto it = first; it != last; it++) { - const auto& metadataDoc = it->doc; - auto migrationId = - uassertStatusOK(UUID::parse(metadataDoc[shard_merge_utils::kMigrationIdFieldName])); - fileImporter->learnedFilename(migrationId, metadataDoc); - } - } catch (const DBException& exc) { - LOGV2_ERROR( - 8423349, "TenantMigrationRecipientOpObserver::onInserts", "exception"_attr = exc); + auto fileImporter = repl::TenantFileImporterService::get(opCtx->getServiceContext()); + for (auto it = first; it != last; it++) { + const auto& metadataDoc = it->doc; + auto migrationId = + uassertStatusOK(UUID::parse(metadataDoc[shard_merge_utils::kMigrationIdFieldName])); + fileImporter->learnedFilename(migrationId, metadataDoc); } } @@ -182,8 +177,8 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, if (recipientStateDoc.getExpireAt() && mtab) { if (mtab->inStateReject()) { // The TenantMigrationRecipientAccessBlocker entry needs to be removed to - // re-allow reads and future migrations with the same tenantId as this migration - // has already been aborted and forgotten. + // re-allow reads and future migrations with the same tenantId as this + // migration has already been aborted and forgotten. TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .remove(recipientStateDoc.getTenantId(), TenantMigrationAccessBlocker::BlockerType::kRecipient); @@ -209,9 +204,6 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, break; case TenantMigrationRecipientStateEnum::kStarted: createAccessBlockerIfNeeded(opCtx, recipientStateDoc); - repl::TenantFileImporterService::get(opCtx->getServiceContext()) - ->startMigration(recipientStateDoc.getId(), - recipientStateDoc.getDonorConnectionString()); break; case TenantMigrationRecipientStateEnum::kLearnedFilenames: break; @@ -221,34 +213,36 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, } break; case TenantMigrationRecipientStateEnum::kDone: - repl::TenantFileImporterService::get(opCtx->getServiceContext()) - ->reset(recipientStateDoc.getId()); break; } - }); - // Perform TenantFileImporterService::learnedAllFilenames work outside of the above onCommit - // hook because of work done in a WriteUnitOfWork. - // TODO SERVER-63789: Revisit this when we make file import async and move - // within onCommit hook. - auto state = recipientStateDoc.getState(); - auto protocol = recipientStateDoc.getProtocol().value_or(kDefaultMigrationProtocol); - if (state == TenantMigrationRecipientStateEnum::kLearnedFilenames) { - tassert(6114400, - "Bad state '{}' for protocol '{}'"_format( - TenantMigrationRecipientState_serializer(state), - MigrationProtocol_serializer(protocol)), - protocol == MigrationProtocolEnum::kShardMerge); - - try { + if (protocol != MigrationProtocolEnum::kShardMerge) { + return; + } + + if (recipientStateDoc.getExpireAt() && mtab) { repl::TenantFileImporterService::get(opCtx->getServiceContext()) - ->learnedAllFilenames(recipientStateDoc.getId()); - } catch (const DBException& exc) { - LOGV2_ERROR(6114104, - "Calling TenantFileImporterService::learnedAllFilenames", - "exception"_attr = exc); + ->interrupt(recipientStateDoc.getId()); } - } + + auto fileImporter = repl::TenantFileImporterService::get(opCtx->getServiceContext()); + + switch (state) { + case TenantMigrationRecipientStateEnum::kUninitialized: + break; + case TenantMigrationRecipientStateEnum::kStarted: + fileImporter->startMigration(recipientStateDoc.getId(), + recipientStateDoc.getDonorConnectionString()); + break; + case TenantMigrationRecipientStateEnum::kLearnedFilenames: + fileImporter->learnedAllFilenames(recipientStateDoc.getId()); + break; + case TenantMigrationRecipientStateEnum::kConsistent: + break; + case TenantMigrationRecipientStateEnum::kDone: + break; + } + }); } } @@ -301,9 +295,12 @@ void TenantMigrationRecipientOpObserver::onDelete(OperationContext* opCtx, LOGV2_INFO(6114101, "Removing expired 'shard merge' migration", "migrationId"_attr = migrationId); - TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .removeRecipientAccessBlockersForMigration(migrationId); - repl::TenantFileImporterService::get(opCtx->getServiceContext())->reset(migrationId); + opCtx->recoveryUnit()->onCommit([opCtx, migrationId](boost::optional<Timestamp>) { + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .removeRecipientAccessBlockersForMigration(migrationId); + repl::TenantFileImporterService::get(opCtx->getServiceContext()) + ->interrupt(migrationId); + }); } } } @@ -318,6 +315,8 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection( opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient); + + repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll(); }); } return {}; diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp index 3d5deee7ce2..b683fe3aebf 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp @@ -176,9 +176,7 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, invariant(db); Lock::CollectionLock collLock(opCtx, nss, MODE_X); auto catalog = CollectionCatalog::get(opCtx); - // TODO SERVER-63789 Uncomment WriteUnitOfWork declaration below when we - // make file import async. - // WriteUnitOfWork wunit(opCtx); + WriteUnitOfWork wunit(opCtx); AutoStatsTracker statsTracker(opCtx, nss, Top::LockType::NotLocked, @@ -218,9 +216,8 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, makeCountsChange(ownedCollection->getRecordStore(), collectionMetadata)); CollectionCatalog::get(opCtx)->onCreateCollection(opCtx, std::move(ownedCollection)); - // TODO SERVER-63789 Uncomment wunit.commit() call below when we - // make file copy/import async. - // wunit.commit(); + wunit.commit(); + LOGV2(6114300, "Imported donor collection", "ns"_attr = nss, @@ -240,23 +237,25 @@ void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) { makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd); ON_BLOCK_EXIT([&] { - client->shutdownAndDisallowReconnect(); - + if (client) { + client->shutdownAndDisallowReconnect(); + } writerPool->shutdown(); writerPool->join(); }); auto fileName = metadataDoc["filename"].str(); auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName]))); - LOGV2_DEBUG(6113320, - 1, - "Cloning file", - "migrationId"_attr = migrationId, - "metadata"_attr = metadataDoc); auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName]))); auto remoteDbpath = metadataDoc["remoteDbpath"].str(); size_t fileSize = std::max(0ll, metadataDoc["fileSize"].safeNumberLong()); auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str()); + LOGV2_DEBUG(6113320, + 1, + "Cloning file", + "migrationId"_attr = migrationId, + "metadata"_attr = metadataDoc, + "destinationRelativePath"_attr = relativePath); invariant(!relativePath.empty()); // Connect the client. |