summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_file_importer_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_file_importer_service.cpp')
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.cpp62
1 files changed, 42 insertions, 20 deletions
diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp
index 85d95d7e22d..af565c3c713 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.cpp
+++ b/src/mongo/db/repl/tenant_file_importer_service.cpp
@@ -118,14 +118,21 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic
void TenantFileImporterService::startMigration(const UUID& migrationId,
const StringData& donorConnectionString) {
stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kStarted && _state < State::kInterrupted) {
+ return;
+ }
+
_reset(lk);
_migrationId = migrationId;
_donorConnectionString = donorConnectionString.toString();
- _eventQueue = std::make_unique<Queue>();
- _state.setState(ImporterState::State::kStarted);
+ _eventQueue = std::make_shared<Queue>();
+ _state = State::kStarted;
- _thread = std::make_unique<stdx::thread>([this] {
+ _thread = std::make_unique<stdx::thread>([this, migrationId] {
Client::initThread("TenantFileImporterService");
+ LOGV2_INFO(6378904,
+ "TenantFileImporterService starting worker thread",
+ "migrationId"_attr = migrationId.toString());
auto opCtx = cc().makeOperationContext();
_handleEvents(opCtx.get());
});
@@ -134,48 +141,55 @@ void TenantFileImporterService::startMigration(const UUID& migrationId,
void TenantFileImporterService::learnedFilename(const UUID& migrationId,
const BSONObj& metadataDoc) {
stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) {
+ return;
+ }
+
tassert(8423347,
"Called learnedFilename with migrationId {}, but {} is active"_format(
migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"),
migrationId == _migrationId);
- _state.setState(ImporterState::State::kLearnedFilename);
+ _state = State::kLearnedFilename;
ImporterEvent event{ImporterEvent::Type::kLearnedFileName, migrationId};
event.metadataDoc = metadataDoc.getOwned();
+ invariant(_eventQueue);
auto success = _eventQueue->tryPush(std::move(event));
- uassert(6378904,
+ uassert(6378903,
"TenantFileImporterService failed to push '{}' event without blocking"_format(
- _state.toString()),
+ stateToString(_state)),
success);
}
void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) {
stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) {
+ return;
+ }
+
tassert(8423345,
"Called learnedAllFilenames with migrationId {}, but {} is active"_format(
migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"),
migrationId == _migrationId);
- _state.setState(ImporterState::State::kLearnedAllFilenames);
+ _state = State::kLearnedAllFilenames;
+ invariant(_eventQueue);
auto success = _eventQueue->tryPush({ImporterEvent::Type::kLearnedAllFilenames, migrationId});
- uassert(6378905,
+ uassert(6378902,
"TenantFileImporterService failed to push '{}' event without blocking"_format(
- _state.toString()),
+ stateToString(_state)),
success);
}
void TenantFileImporterService::interrupt(const UUID& migrationId) {
stdx::lock_guard lk(_mutex);
- if (!_migrationId) {
- return;
- }
if (migrationId != _migrationId) {
LOGV2_WARNING(
- 6378907,
+ 6378901,
"Called interrupt with migrationId {migrationId}, but {activeMigrationId} is active",
"migrationId"_attr = migrationId.toString(),
- "activeMigrationId"_attr = _migrationId->toString());
+ "activeMigrationId"_attr = _migrationId ? _migrationId->toString() : "no migration");
return;
}
_interrupt(lk);
@@ -195,8 +209,11 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) {
std::string donorConnectionString;
boost::optional<UUID> migrationId;
+ std::shared_ptr<Queue> eventQueueRef;
{
stdx::lock_guard lk(_mutex);
+ invariant(_eventQueue);
+ eventQueueRef = _eventQueue;
donorConnectionString = _donorConnectionString;
migrationId = _migrationId;
}
@@ -206,9 +223,9 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) {
opCtx->checkForInterrupt();
try {
- event = _eventQueue->pop(opCtx);
+ event = eventQueueRef->pop(opCtx);
} catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>& err) {
- LOGV2_WARNING(6378908, "Event queue was interrupted", "error"_attr = err);
+ LOGV2_WARNING(6378900, "Event queue was interrupted", "error"_attr = err);
break;
}
@@ -259,7 +276,7 @@ void TenantFileImporterService::_voteImportedFiles(OperationContext* opCtx) {
}
void TenantFileImporterService::_interrupt(WithLock) {
- if (_state.is(ImporterState::State::kInterrupted)) {
+ if (_state == State::kInterrupted) {
return;
}
@@ -276,11 +293,16 @@ void TenantFileImporterService::_interrupt(WithLock) {
// _opCtx->markKilled(ErrorCodes::Interrupted);
}
- _state.setState(ImporterState::State::kInterrupted);
+ _state = State::kInterrupted;
}
void TenantFileImporterService::_reset(WithLock) {
- _migrationId.reset();
+ if (_migrationId) {
+ LOGV2_INFO(6378905,
+ "TenantFileImporterService resetting migration",
+ "migrationId"_attr = _migrationId->toString());
+ _migrationId.reset();
+ }
if (_thread && _thread->joinable()) {
_thread->join();
@@ -292,6 +314,6 @@ void TenantFileImporterService::_reset(WithLock) {
}
// TODO SERVER-66907: how should we be resetting _opCtx?
- _state.setState(ImporterState::State::kUninitialized);
+ _state = State::kUninitialized;
}
} // namespace mongo::repl