diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_file_importer_service.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_file_importer_service.cpp | 62 |
1 files changed, 42 insertions, 20 deletions
diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp index 85d95d7e22d..af565c3c713 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.cpp +++ b/src/mongo/db/repl/tenant_file_importer_service.cpp @@ -118,14 +118,21 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic void TenantFileImporterService::startMigration(const UUID& migrationId, const StringData& donorConnectionString) { stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kStarted && _state < State::kInterrupted) { + return; + } + _reset(lk); _migrationId = migrationId; _donorConnectionString = donorConnectionString.toString(); - _eventQueue = std::make_unique<Queue>(); - _state.setState(ImporterState::State::kStarted); + _eventQueue = std::make_shared<Queue>(); + _state = State::kStarted; - _thread = std::make_unique<stdx::thread>([this] { + _thread = std::make_unique<stdx::thread>([this, migrationId] { Client::initThread("TenantFileImporterService"); + LOGV2_INFO(6378904, + "TenantFileImporterService starting worker thread", + "migrationId"_attr = migrationId.toString()); auto opCtx = cc().makeOperationContext(); _handleEvents(opCtx.get()); }); @@ -134,48 +141,55 @@ void TenantFileImporterService::startMigration(const UUID& migrationId, void TenantFileImporterService::learnedFilename(const UUID& migrationId, const BSONObj& metadataDoc) { stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) { + return; + } + tassert(8423347, "Called learnedFilename with migrationId {}, but {} is active"_format( migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"), migrationId == _migrationId); - _state.setState(ImporterState::State::kLearnedFilename); + _state = State::kLearnedFilename; ImporterEvent event{ImporterEvent::Type::kLearnedFileName, migrationId}; event.metadataDoc = metadataDoc.getOwned(); + invariant(_eventQueue); auto success = _eventQueue->tryPush(std::move(event)); - uassert(6378904, + uassert(6378903, "TenantFileImporterService failed to push '{}' event without blocking"_format( - _state.toString()), + stateToString(_state)), success); } void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) { stdx::lock_guard lk(_mutex); + if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) { + return; + } + tassert(8423345, "Called learnedAllFilenames with migrationId {}, but {} is active"_format( migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"), migrationId == _migrationId); - _state.setState(ImporterState::State::kLearnedAllFilenames); + _state = State::kLearnedAllFilenames; + invariant(_eventQueue); auto success = _eventQueue->tryPush({ImporterEvent::Type::kLearnedAllFilenames, migrationId}); - uassert(6378905, + uassert(6378902, "TenantFileImporterService failed to push '{}' event without blocking"_format( - _state.toString()), + stateToString(_state)), success); } void TenantFileImporterService::interrupt(const UUID& migrationId) { stdx::lock_guard lk(_mutex); - if (!_migrationId) { - return; - } if (migrationId != _migrationId) { LOGV2_WARNING( - 6378907, + 6378901, "Called interrupt with migrationId {migrationId}, but {activeMigrationId} is active", "migrationId"_attr = migrationId.toString(), - "activeMigrationId"_attr = _migrationId->toString()); + "activeMigrationId"_attr = _migrationId ? _migrationId->toString() : "no migration"); return; } _interrupt(lk); @@ -195,8 +209,11 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) { std::string donorConnectionString; boost::optional<UUID> migrationId; + std::shared_ptr<Queue> eventQueueRef; { stdx::lock_guard lk(_mutex); + invariant(_eventQueue); + eventQueueRef = _eventQueue; donorConnectionString = _donorConnectionString; migrationId = _migrationId; } @@ -206,9 +223,9 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) { opCtx->checkForInterrupt(); try { - event = _eventQueue->pop(opCtx); + event = eventQueueRef->pop(opCtx); } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>& err) { - LOGV2_WARNING(6378908, "Event queue was interrupted", "error"_attr = err); + LOGV2_WARNING(6378900, "Event queue was interrupted", "error"_attr = err); break; } @@ -259,7 +276,7 @@ void TenantFileImporterService::_voteImportedFiles(OperationContext* opCtx) { } void TenantFileImporterService::_interrupt(WithLock) { - if (_state.is(ImporterState::State::kInterrupted)) { + if (_state == State::kInterrupted) { return; } @@ -276,11 +293,16 @@ void TenantFileImporterService::_interrupt(WithLock) { // _opCtx->markKilled(ErrorCodes::Interrupted); } - _state.setState(ImporterState::State::kInterrupted); + _state = State::kInterrupted; } void TenantFileImporterService::_reset(WithLock) { - _migrationId.reset(); + if (_migrationId) { + LOGV2_INFO(6378905, + "TenantFileImporterService resetting migration", + "migrationId"_attr = _migrationId->toString()); + _migrationId.reset(); + } if (_thread && _thread->joinable()) { _thread->join(); @@ -292,6 +314,6 @@ void TenantFileImporterService::_reset(WithLock) { } // TODO SERVER-66907: how should we be resetting _opCtx? - _state.setState(ImporterState::State::kUninitialized); + _state = State::kUninitialized; } } // namespace mongo::repl |