summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_recipient_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp117
1 files changed, 74 insertions, 43 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 57ec62819a1..c6c4d4d2958 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -76,6 +76,7 @@
#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/assert_util.h"
@@ -93,6 +94,8 @@ const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex";
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd;
constexpr int kBackupCursorFileFetcherRetryAttempts = 10;
+constexpr int kCheckpointTsBackupCursorErrorCode = 6929900;
+constexpr int kCloseCursorBeforeOpenErrorCode = 50886;
NamespaceString getOplogBufferNs(const UUID& migrationUUID) {
return NamespaceString(NamespaceString::kConfigDb,
@@ -979,25 +982,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_killBackupCursor()
nullptr);
request.sslMode = _donorUri.getSSLMode();
- auto scheduleResult =
- (_recipientService->getInstanceCleanupExecutor())
- ->scheduleRemoteCommand(
- request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- if (!args.response.isOK()) {
- LOGV2_WARNING(6113005,
- "killCursors command task failed",
- "error"_attr = redact(args.response.status));
- return;
- }
- auto status = getStatusFromCommandResult(args.response.data);
- if (status.isOK()) {
- LOGV2_INFO(6113415, "Killed backup cursor");
- } else {
- LOGV2_WARNING(6113006,
- "killCursors command failed",
- "error"_attr = redact(status));
- }
- });
+ const auto scheduleResult = _scheduleKillBackupCursorWithLock(
+ lk, _recipientService->getInstanceCleanupExecutor());
if (!scheduleResult.isOK()) {
LOGV2_WARNING(6113004,
"Failed to run killCursors command on backup cursor",
@@ -1009,13 +995,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_killBackupCursor()
SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
const CancellationToken& token) {
- stdx::lock_guard lk(_mutex);
- LOGV2_DEBUG(6113000,
- 1,
- "Trying to open backup cursor on donor primary",
- "migrationId"_attr = _stateDoc.getId(),
- "donorConnectionString"_attr = _stateDoc.getDonorConnectionString());
- const auto cmdObj = [] {
+
+ const auto aggregateCommandRequestObj = [] {
AggregateCommandRequest aggRequest(
NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
{BSON("$backupCursor" << BSONObj())});
@@ -1024,11 +1005,18 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
return aggRequest.toBSON(BSONObj());
}();
- auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp();
+ stdx::lock_guard lk(_mutex);
+ LOGV2_DEBUG(6113000,
+ 1,
+ "Trying to open backup cursor on donor primary",
+ "migrationId"_attr = _stateDoc.getId(),
+ "donorConnectionString"_attr = _stateDoc.getDonorConnectionString());
+
+ const auto startMigrationDonorTimestamp = _stateDoc.getStartMigrationDonorTimestamp();
auto fetchStatus = std::make_shared<boost::optional<Status>>();
auto uniqueMetadataInfo = std::make_unique<boost::optional<shard_merge_utils::MetadataInfo>>();
- auto fetcherCallback =
+ const auto fetcherCallback =
[
this,
self = shared_from_this(),
@@ -1043,8 +1031,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
uassertStatusOK(dataStatus);
uassert(ErrorCodes::CallbackCanceled, "backup cursor interrupted", !token.isCanceled());
- auto uniqueOpCtx = cc().makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
+ const auto uniqueOpCtx = cc().makeOperationContext();
+ const auto opCtx = uniqueOpCtx.get();
const auto& data = dataStatus.getValue();
for (const BSONObj& doc : data.documents) {
@@ -1059,14 +1047,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
"backupCursorId"_attr = data.cursorId,
"backupCursorCheckpointTimestamp"_attr = checkpointTimestamp);
- // This ensures that the recipient won’t receive any 2 phase index build donor
- // oplog entries during the migration. We also have a check in the tenant oplog
- // applier to detect such oplog entries. Adding a check here helps us to detect
- // the problem earlier.
- uassert(6929900,
- "backupCursorCheckpointTimestamp should be greater than or equal to "
- "startMigrationDonorTimestamp",
- checkpointTimestamp >= startMigrationDonorTimestamp);
{
stdx::lock_guard lk(_mutex);
stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
@@ -1075,6 +1055,15 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
BackupCursorInfo{data.cursorId, data.nss, checkpointTimestamp});
}
+ // This ensures that the recipient won’t receive any 2 phase index build donor
+ // oplog entries during the migration. We also have a check in the tenant oplog
+ // applier to detect such oplog entries. Adding a check here helps us to detect
+ // the problem earlier.
+ uassert(kCheckpointTsBackupCursorErrorCode,
+ "backupCursorCheckpointTimestamp should be greater than or equal to "
+ "startMigrationDonorTimestamp",
+ checkpointTimestamp >= startMigrationDonorTimestamp);
+
invariant(metadataInfoPtr && !*metadataInfoPtr);
(*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo(
getMigrationUUID(), _client->getServerAddress(), metadata);
@@ -1133,10 +1122,10 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
};
_donorFilenameBackupCursorFileFetcher = std::make_unique<Fetcher>(
- (**_scopedExecutor).get(),
+ _backupCursorExecutor.get(),
_client->getServerHostAndPort(),
NamespaceString::kAdminDb.toString(),
- cmdObj,
+ aggregateCommandRequestObj,
fetcherCallback,
ReadPreferenceSetting(ReadPreference::PrimaryPreferred).toContainingBSON(),
executor::RemoteCommandRequest::kNoTimeout, /* aggregateTimeout */
@@ -1160,6 +1149,35 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursor(
.semi();
}
+StatusWith<executor::TaskExecutor::CallbackHandle>
+TenantMigrationRecipientService::Instance::_scheduleKillBackupCursorWithLock(
+ WithLock lk, std::shared_ptr<executor::TaskExecutor> executor) {
+ auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk);
+ executor::RemoteCommandRequest killCursorsRequest(
+ _client->getServerHostAndPort(),
+ donorBackupCursorInfo.nss.db().toString(),
+ BSON("killCursors" << donorBackupCursorInfo.nss.coll().toString() << "cursors"
+ << BSON_ARRAY(donorBackupCursorInfo.cursorId)),
+ nullptr);
+ killCursorsRequest.sslMode = _donorUri.getSSLMode();
+
+ return executor->scheduleRemoteCommand(
+ killCursorsRequest, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
+ if (!args.response.isOK()) {
+ LOGV2_WARNING(6113005,
+ "killCursors command task failed",
+ "error"_attr = redact(args.response.status));
+ return;
+ }
+ auto status = getStatusFromCommandResult(args.response.data);
+ if (status.isOK()) {
+ LOGV2_INFO(6113415, "Killed backup cursor");
+ } else {
+ LOGV2_WARNING(6113006, "killCursors command failed", "error"_attr = redact(status));
+ }
+ });
+}
+
SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursorWithRetry(
const CancellationToken& token) {
return AsyncTry([this, self = shared_from_this(), token] { return _openBackupCursor(token); })
@@ -1169,8 +1187,20 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_openBackupCursorWit
"Retrying backup cursor creation after transient error",
"migrationId"_attr = getMigrationUUID(),
"status"_attr = status);
- // A checkpoint took place while opening a backup cursor. We
- // should retry and *not* cancel migration.
+
+ return false;
+ } else if (status.code() == kCheckpointTsBackupCursorErrorCode ||
+ status.code() == kCloseCursorBeforeOpenErrorCode) {
+ LOGV2_INFO(6955100,
+ "Closing backup cursor and retrying after getting retryable error",
+ "migrationId"_attr = getMigrationUUID(),
+ "status"_attr = status);
+
+ stdx::lock_guard lk(_mutex);
+ const auto scheduleResult =
+ _scheduleKillBackupCursorWithLock(lk, _backupCursorExecutor);
+ uassertStatusOK(scheduleResult);
+
return false;
}
@@ -1199,7 +1229,7 @@ void TenantMigrationRecipientService::Instance::_keepBackupCursorAlive(
auto& donorBackupCursorInfo = _getDonorBackupCursorInfo(lk);
_backupCursorKeepAliveFuture =
shard_merge_utils::keepBackupCursorAlive(_backupCursorKeepAliveCancellation,
- **_scopedExecutor,
+ _backupCursorExecutor,
_client->getServerHostAndPort(),
donorBackupCursorInfo.cursorId,
donorBackupCursorInfo.nss);
@@ -2703,6 +2733,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
_scopedExecutor = executor;
+ _backupCursorExecutor = **_scopedExecutor;
auto scopedOutstandingMigrationCounter =
TenantMigrationStatistics::get(_serviceContext)->getScopedOutstandingReceivingCount();