summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_recipient_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_recipient_service.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp70
1 files changed, 42 insertions, 28 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 1e01d46a9be..d64e9af6433 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -95,6 +95,8 @@ using namespace fmt;
const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex";
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd;
+
+constexpr StringData kTenantOplogApplierProgressPrefix = "repl.migration.progress_"_sd;
constexpr int kBackupCursorFileFetcherRetryAttempts = 10;
constexpr int kCheckpointTsBackupCursorErrorCode = 6929900;
constexpr int kCloseCursorBeforeOpenErrorCode = 50886;
@@ -2672,16 +2674,7 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() {
stdx::unique_lock lk(_mutex);
const auto& cloneFinishedRecipientOpTime = _stateDoc.getCloneFinishedRecipientOpTime();
invariant(cloneFinishedRecipientOpTime);
-
- OpTime resumeOpTime;
- if (_sharedData->getResumePhase() == ResumePhase::kOplogCatchup) {
- lk.unlock();
- // We avoid holding the mutex while scanning the local oplog which
- // acquires the RSTL in IX mode. This is to allow us to be interruptable
- // via a concurrent stepDown which acquires the RSTL in X mode.
- resumeOpTime = _getOplogResumeApplyingDonorOptime(*cloneFinishedRecipientOpTime);
- lk.lock();
- }
+ invariant(!cloneFinishedRecipientOpTime->isNull());
// Throwing error when cloner is canceled externally via interrupt(),
// makes the instance to skip the remaining task (i.e., starting oplog
@@ -2696,18 +2689,37 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() {
const auto& startApplyingDonorOpTime = _stateDoc.getStartApplyingDonorOpTime();
invariant(startApplyingDonorOpTime);
- _tenantOplogApplier = std::make_shared<TenantOplogApplier>(
- _migrationUuid,
- _protocol,
- (_protocol != MigrationProtocolEnum::kShardMerge) ? boost::make_optional(_tenantId)
- : boost::none,
- (!resumeOpTime.isNull()) ? std::max(resumeOpTime, *startApplyingDonorOpTime)
- : *startApplyingDonorOpTime,
- _donorOplogBuffer.get(),
- **_scopedExecutor,
- _writerPool.get(),
- resumeOpTime.getTimestamp());
- _tenantOplogApplier->setCloneFinishedRecipientOpTime(*cloneFinishedRecipientOpTime);
+ OpTime deprecatedResumeOpTime;
+ boost::optional<NamespaceString> nss = boost::none;
+ // If we are running < 7.0, fall back to oplog scanning for TenantOplogApplier resumption. Only
+ // versions 7.0 and later will store oplog applier progress data in a replicated collection.
+ if (serverGlobalParams.featureCompatibility.isLessThan(
+ multiversion::FeatureCompatibilityVersion::kVersion_7_0)) {
+ // We avoid holding the mutex while scanning the local oplog which
+ // acquires the RSTL in IX mode. This is to allow us to be interruptable
+ // via a concurrent stepDown which acquires the RSTL in X mode.
+ lk.unlock();
+ // This node is not aware of the tenant oplog applier progress collection, fall back to
+ // scanning the oplog to ensure that we don't throw away our previous progress.
+ deprecatedResumeOpTime = _getOplogResumeApplyingDonorOptime(*cloneFinishedRecipientOpTime);
+ lk.lock();
+ } else {
+ nss = NamespaceString::makeTenantOplogApplierProgressNSS(_migrationUuid);
+ }
+
+ _tenantOplogApplier = std::make_shared<TenantOplogApplier>(_migrationUuid,
+ _protocol,
+ _tenantId,
+ nss,
+ *startApplyingDonorOpTime,
+ _donorOplogBuffer.get(),
+ **_scopedExecutor,
+ _writerPool.get(),
+ *cloneFinishedRecipientOpTime,
+ _sharedData->getResumePhase() ==
+ ResumePhase::kOplogCatchup);
+
+ _tenantOplogApplier->setDeprecatedResumeOpTime(deprecatedResumeOpTime);
LOGV2_DEBUG(4881202,
1,
@@ -2715,8 +2727,7 @@ void TenantMigrationRecipientService::Instance::_startOplogApplier() {
"tenantId"_attr = getTenantId(),
"migrationId"_attr = getMigrationUUID(),
"startApplyingAfterDonorOpTime"_attr =
- _tenantOplogApplier->getStartApplyingAfterOpTime(),
- "resumeBatchingTs"_attr = _tenantOplogApplier->getResumeBatchingTs());
+ _tenantOplogApplier->getStartApplyingAfterOpTime());
uassertStatusOK(_tenantOplogApplier->startup());
_oplogApplierReady = true;
@@ -2856,15 +2867,18 @@ void TenantMigrationRecipientService::Instance::_dropTempCollections() {
auto opCtx = cc().makeOperationContext();
auto storageInterface = StorageInterface::get(opCtx.get());
- // The donated files and oplog buffer collections can be safely dropped at this
- // point. In case either collection does not exist, dropping will be a no-op.
- // It isn't necessary that a given drop is majority-committed. A new primary will
- // attempt to drop the collection anyway.
+ // The donated files, oplog buffer, and tenant oplog applier progress collections
+ // can be safely dropped at this point. In case either collection does not exist,
+ // dropping will be a no-op. It isn't necessary that a given drop is
+ // majority-committed. A new primary will attempt to drop the collection anyway.
uassertStatusOK(storageInterface->dropCollection(
opCtx.get(), shard_merge_utils::getDonatedFilesNs(getMigrationUUID())));
uassertStatusOK(
storageInterface->dropCollection(opCtx.get(), getOplogBufferNs(getMigrationUUID())));
+
+ uassertStatusOK(storageInterface->dropCollection(
+ opCtx.get(), NamespaceString::makeTenantOplogApplierProgressNSS(getMigrationUUID())));
}
SemiFuture<void> TenantMigrationRecipientService::Instance::run(