summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp63
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h5
2 files changed, 40 insertions, 28 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 6911f5509ba..77f469aa819 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -940,9 +940,8 @@ void TenantMigrationRecipientService::Instance::_killBackupCursor(WithLock lk) {
"Failed to run killCursors command on backup cursor",
"status"_attr = scheduleResult.getStatus());
}
-
- _backupCursorId = 0;
}
+
ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetcher(
const CancellationToken& token) {
stdx::lock_guard lk(_mutex);
@@ -965,7 +964,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch
auto returnedFiles = std::make_shared<std::vector<BSONObj>>();
auto fetchStatus = std::make_shared<boost::optional<Status>>();
- auto fetcherCallback = [this, fetchStatus, returnedFiles, token](
+ auto fetcherCallback = [this, self = shared_from_this(), fetchStatus, returnedFiles, token](
const Fetcher::QueryResponseStatus& dataStatus,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
@@ -1027,7 +1026,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch
getMoreBob->append("collection", data.nss.coll());
};
- auto fetcher = std::make_shared<Fetcher>(
+ _backupCursorFileFetcher = std::make_unique<Fetcher>(
(**_scopedExecutor).get(),
_client->getServerHostAndPort(),
NamespaceString::kAdminDb.toString(),
@@ -1040,26 +1039,28 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_createFileFetch
kBackupCursorFileFetcherRetryAttempts, executor::RemoteCommandRequest::kNoTimeout),
transport::kGlobalSSLMode);
- uassertStatusOK(fetcher->schedule());
+ uassertStatusOK(_backupCursorFileFetcher->schedule());
- return fetcher->onCompletion().thenRunOn(**_scopedExecutor).then([this, fetcher, fetchStatus] {
- if (!*fetchStatus) {
- // the callback was never invoked
- uasserted(6113007, "Internal error running cursor callback in command");
- }
+ return _backupCursorFileFetcher->onCompletion()
+ .thenRunOn(**_scopedExecutor)
+ .then([this, self = shared_from_this(), fetchStatus] {
+ if (!*fetchStatus) {
+ // the callback was never invoked
+ uasserted(6113007, "Internal error running cursor callback in command");
+ }
- auto status = fetchStatus->get();
- if (!status.isOK() && status.code() != 50915) {
- // In the event of 50915: A checkpoint took place while
- // opening a backup cursor, we should retry and *not* cancel
- // migration. See https://jira.mongodb.org/browse/SERVER-61964
- // TODO (SERVER-61964): remove conditional check for 50915 error
- // and cancel migration if !status.isOK()
- this->cancelMigration();
- }
+ auto status = fetchStatus->get();
+ if (!status.isOK() && status.code() != 50915) {
+ // In the event of 50915: A checkpoint took place while
+ // opening a backup cursor, we should retry and *not* cancel
+ // migration. See https://jira.mongodb.org/browse/SERVER-61964
+ // TODO (SERVER-61964): remove conditional check for 50915 error
+ // and cancel migration if !status.isOK()
+ this->cancelMigration();
+ }
- uassertStatusOK(status);
- });
+ uassertStatusOK(status);
+ });
}
void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock lk) {
@@ -2099,6 +2100,10 @@ void TenantMigrationRecipientService::Instance::_interrupt(Status status,
_cancelRemainingWork(lk);
+ if (_backupCursorFileFetcher) {
+ _backupCursorFileFetcher->shutdown();
+ }
+
// If the task is running, then setting promise result will be taken care by the main task
// continuation chain.
if (_taskState.isNotStarted()) {
@@ -2185,6 +2190,9 @@ void TenantMigrationRecipientService::Instance::_cleanupOnDataSyncCompletion(Sta
swap(savedDonorOplogFetcher, _donorOplogFetcher);
swap(savedTenantOplogApplier, _tenantOplogApplier);
swap(savedWriterPool, _writerPool);
+
+ _backupCursorId = 0;
+ _backupCursorFileFetcher = nullptr;
}
// Perform join outside the lock to avoid deadlocks.
@@ -2501,7 +2509,9 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
return SemiFuture<void>::makeReady().thenRunOn(**_scopedExecutor);
}
- return AsyncTry([this, token] { return _createFileFetcher(token); })
+ return AsyncTry([this, self = shared_from_this(), token] {
+ return _createFileFetcher(token);
+ })
.until([](Status status) {
if (status.code() == 50915) {
LOGV2_DEBUG(6113008,
@@ -2517,12 +2527,13 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
return true;
})
- .on(_recipientService->getInstanceCleanupExecutor(), token);
+ .on(**_scopedExecutor, token);
})
.then([this, self = shared_from_this()] {
- // TODO (SERVER-61131) temporarily stop fetcher/backup cursor for
- // now. Otherwise, nothing will actually shut down the backup cursor,
- // since we won't be implementing that until later.
+ // TODO (SERVER-61131) temporarily stop fetcher/backup cursor here for
+ // now. We shut down the backup cursor in onCompletion continuation, but
+ // some tests fail unless we do this here, punting on dealing with those
+ // tests until later ticket(s)
stdx::lock_guard lk(_mutex);
_killBackupCursor(lk);
_getStartOpTimesFromDonor(lk);
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 14129b6c1c1..41518acb6e8 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -578,8 +578,9 @@ public:
std::unique_ptr<DBClientConnection> _client; // (S)
std::unique_ptr<DBClientConnection> _oplogFetcherClient; // (S)
- CursorId _backupCursorId; // (M)
- NamespaceString _backupCursorNamespaceString; // (M)
+ CursorId _backupCursorId; // (M)
+ NamespaceString _backupCursorNamespaceString; // (M)
+ std::unique_ptr<Fetcher> _backupCursorFileFetcher; // (M)
std::unique_ptr<OplogFetcherFactory> _createOplogFetcherFn =
std::make_unique<CreateOplogFetcherFn>(); // (M)