diff options
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 63 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.h | 5 |
2 files changed, 40 insertions, 28 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 6911f5509ba..77f469aa819 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -940,9 +940,8 @@ void TenantMigrationRecipientService::Instance::_killBackupCursor(WithLock lk) { "Failed to run killCursors command on backup cursor", "status"_attr = scheduleResult.getStatus()); } - - _backupCursorId = 0; } + ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetcher( const CancellationToken& token) { stdx::lock_guard lk(_mutex); @@ -965,7 +964,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch auto returnedFiles = std::make_shared<std::vector<BSONObj>>(); auto fetchStatus = std::make_shared<boost::optional<Status>>(); - auto fetcherCallback = [this, fetchStatus, returnedFiles, token]( + auto fetcherCallback = [this, self = shared_from_this(), fetchStatus, returnedFiles, token]( const Fetcher::QueryResponseStatus& dataStatus, Fetcher::NextAction* nextAction, BSONObjBuilder* getMoreBob) { @@ -1027,7 +1026,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch getMoreBob->append("collection", data.nss.coll()); }; - auto fetcher = std::make_shared<Fetcher>( + _backupCursorFileFetcher = std::make_unique<Fetcher>( (**_scopedExecutor).get(), _client->getServerHostAndPort(), NamespaceString::kAdminDb.toString(), @@ -1040,26 +1039,28 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch kBackupCursorFileFetcherRetryAttempts, executor::RemoteCommandRequest::kNoTimeout), transport::kGlobalSSLMode); - uassertStatusOK(fetcher->schedule()); + uassertStatusOK(_backupCursorFileFetcher->schedule()); - return fetcher->onCompletion().thenRunOn(**_scopedExecutor).then([this, fetcher, fetchStatus] { - if (!*fetchStatus) { - // the callback was never invoked - uasserted(6113007, "Internal error running cursor callback in command"); - } + return _backupCursorFileFetcher->onCompletion() + .thenRunOn(**_scopedExecutor) + .then([this, self = shared_from_this(), fetchStatus] { + if (!*fetchStatus) { + // the callback was never invoked + uasserted(6113007, "Internal error running cursor callback in command"); + } - auto status = fetchStatus->get(); - if (!status.isOK() && status.code() != 50915) { - // In the event of 50915: A checkpoint took place while - // opening a backup cursor, we should retry and *not* cancel - // migration. See https://jira.mongodb.org/browse/SERVER-61964 - // TODO (SERVER-61964): remove conditional check for 50915 error - // and cancel migration if !status.isOK() - this->cancelMigration(); - } + auto status = fetchStatus->get(); + if (!status.isOK() && status.code() != 50915) { + // In the event of 50915: A checkpoint took place while + // opening a backup cursor, we should retry and *not* cancel + // migration. See https://jira.mongodb.org/browse/SERVER-61964 + // TODO (SERVER-61964): remove conditional check for 50915 error + // and cancel migration if !status.isOK() + this->cancelMigration(); + } - uassertStatusOK(status); - }); + uassertStatusOK(status); + }); } void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock lk) { @@ -2099,6 +2100,10 @@ void TenantMigrationRecipientService::Instance::_interrupt(Status status, _cancelRemainingWork(lk); + if (_backupCursorFileFetcher) { + _backupCursorFileFetcher->shutdown(); + } + // If the task is running, then setting promise result will be taken care by the main task // continuation chain. if (_taskState.isNotStarted()) { @@ -2185,6 +2190,9 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta swap(savedDonorOplogFetcher, _donorOplogFetcher); swap(savedTenantOplogApplier, _tenantOplogApplier); swap(savedWriterPool, _writerPool); + + _backupCursorId = 0; + _backupCursorFileFetcher = nullptr; } // Perform join outside the lock to avoid deadlocks. @@ -2501,7 +2509,9 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( return SemiFuture<void>::makeReady().thenRunOn(**_scopedExecutor); } - return AsyncTry([this, token] { return _createFileFetcher(token); }) + return AsyncTry([this, self = shared_from_this(), token] { + return _createFileFetcher(token); + }) .until([](Status status) { if (status.code() == 50915) { LOGV2_DEBUG(6113008, @@ -2517,12 +2527,13 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( return true; }) - .on(_recipientService->getInstanceCleanupExecutor(), token); + .on(**_scopedExecutor, token); }) .then([this, self = shared_from_this()] { - // TODO (SERVER-61131) temporarily stop fetcher/backup cursor for - // now. Otherwise, nothing will actually shut down the backup cursor, - // since we won't be implementing that until later. + // TODO (SERVER-61131) temporarily stop fetcher/backup cursor here for + // now. We shut down the backup cursor in onCompletion continuation, but + // some tests fail unless we do this here, punting on dealing with those + // tests until later ticket(s) stdx::lock_guard lk(_mutex); _killBackupCursor(lk); _getStartOpTimesFromDonor(lk); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 14129b6c1c1..41518acb6e8 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -578,8 +578,9 @@ public: std::unique_ptr<DBClientConnection> _client; // (S) std::unique_ptr<DBClientConnection> _oplogFetcherClient; // (S) - CursorId _backupCursorId; // (M) - NamespaceString _backupCursorNamespaceString; // (M) + CursorId _backupCursorId; // (M) + NamespaceString _backupCursorNamespaceString; // (M) + std::unique_ptr<Fetcher> _backupCursorFileFetcher; // (M) std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn = std::make_unique<CreateOplogFetcherFn>(); // (M) |