diff options
author | Lingzhi Deng <lingzhi.deng@mongodb.com> | 2021-03-31 14:24:33 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-05 15:07:12 +0000 |
commit | a5e7e1bf475025df6faf17709ba9e34b83c26e69 (patch) | |
tree | 04e96347184ba620433e9de075518fdf636c472a /src/mongo | |
parent | 3b0d71d80ebf5756502fc94ba086de30599b951d (diff) | |
download | mongo-a5e7e1bf475025df6faf17709ba9e34b83c26e69.tar.gz |
SERVER-55357: Start a new retryable write chain on each migration
(cherry picked from commit 03fbe539140630c6da4a29dfee160649d6453457)
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_recipient_service.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 57 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.h | 8 |
3 files changed, 46 insertions, 21 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 54b09f2f63a..9b253ba6f9a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -2053,6 +2053,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( "migrationId"_attr = getMigrationUUID()); { stdx::lock_guard lk(_mutex); + _tenantOplogApplier->setCloneFinishedRecipientOpTime( + *_stateDoc.getCloneFinishedRecipientOpTime()); uassertStatusOK(_tenantOplogApplier->startup()); _isRestartingOplogApplier = false; _restartOplogApplierCondVar.notify_all(); diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 5c0033802e5..3849180874e 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -110,6 +110,14 @@ Timestamp TenantOplogApplier::getResumeBatchingTs_forTest() const { return _resumeBatchingTs; } +void TenantOplogApplier::setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime) { + stdx::lock_guard lk(_mutex); + invariant(!_isActive_inlock()); + invariant(!cloneFinishedRecipientOpTime.isNull()); + invariant(_cloneFinishedRecipientOpTime.isNull()); + _cloneFinishedRecipientOpTime = cloneFinishedRecipientOpTime; +} + Status TenantOplogApplier::_doStartup_inlock() noexcept { _oplogBatcher = std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor, _resumeBatchingTs); @@ -605,19 +613,20 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( auto sessionId = *entry.getSessionId(); auto txnNumber = *entry.getTxnNumber(); auto entryStmtIds = entry.getStatementIds(); + LOGV2_DEBUG(5351000, + 2, + "Tenant Oplog Applier processing retryable write", + "entry"_attr = redact(entry.toBSONForLogging()), + "sessionId"_attr = sessionId, + "txnNumber"_attr = txnNumber, + "statementIds"_attr = entryStmtIds, + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid); if (entry.getOpType() == repl::OpTypeEnum::kNoop) { // There are two types of no-ops we expect here. One is pre/post image, which // will have an empty o2 field. The other is previously transformed oplog // entries from earlier migrations. - LOGV2_DEBUG(5351000, - 2, - "Tenant Oplog Applier processing retryable write no-op", - "entry"_attr = redact(entry.toBSONForLogging()), - "sessionId"_attr = sessionId, - "txnNumber"_attr = txnNumber, - "statementIds"_attr = entryStmtIds, - "tenant"_attr = _tenantId, - "migrationUuid"_attr = _migrationUuid); + // We don't wrap the no-ops in another no-op. // If object2 is missing, this is a preImage/postImage. if (!entry.getObject2()) { @@ -651,16 +660,6 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( } stmtIds.insert(stmtIds.end(), entryStmtIds.begin(), entryStmtIds.end()); - LOGV2_DEBUG(5350901, - 2, - "Tenant Oplog Applier processing retryable write", - "sessionId"_attr = sessionId, - "txnNumber"_attr = txnNumber, - "statementIds"_attr = entryStmtIds, - "tenant"_attr = _tenantId, - "noop_entry"_attr = redact(noopEntry.toBSON()), - "migrationUuid"_attr = _migrationUuid); - if (entry.getPreImageOpTime()) { uassert( 5351005, @@ -732,7 +731,18 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( << txnNumber << " statement " << entryStmtIds.front() << " on session " << sessionId, !txnParticipant.checkStatementExecutedNoOplogEntryFetch(entryStmtIds.front())); - prevWriteOpTime = txnParticipant.getLastWriteOpTime(); + + // We could have an existing lastWriteOpTime for the same retryable write chain from a + // previously aborted migration. This could also happen if the tenant being migrated has + // previously resided in this replica set. So we want to start a new history chain + // instead of linking the newly generated no-op to the existing chain before the current + // migration starts. Otherwise, we could have duplicate entries for the same stmtId. + invariant(!_cloneFinishedRecipientOpTime.isNull()); + if (txnParticipant.getLastWriteOpTime() > _cloneFinishedRecipientOpTime) { + prevWriteOpTime = txnParticipant.getLastWriteOpTime(); + } else { + prevWriteOpTime = OpTime(); + } // Set sessionId, txnNumber, and statementId for all ops in a retryable write. noopEntry.setSessionId(sessionId); @@ -755,6 +765,13 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( noopEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime); + LOGV2_DEBUG(5535700, + 2, + "Tenant Oplog Applier writing session no-op", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "op"_attr = redact(noopEntry.toBSON())); + AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite); writeConflictRetry( opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] { diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h index 3da5212f979..8abfc3fca14 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.h +++ b/src/mongo/db/repl/tenant_oplog_applier.h @@ -95,6 +95,10 @@ public: return _numOpsApplied; } + /** + * This should only be called once before the applier starts. + */ + void setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime); /** * Returns the optime the applier will start applying from. Used for testing. @@ -158,10 +162,12 @@ private: const OpTime _beginApplyingAfterOpTime; // (R) RandomAccessOplogBuffer* _oplogBuffer; // (R) std::shared_ptr<executor::TaskExecutor> _executor; // (R) + // All no-op entries written by this tenant migration should have OpTime greater than this + // OpTime. + OpTime _cloneFinishedRecipientOpTime = OpTime(); // (R) // Keeps track of last applied donor and recipient optimes by the tenant oplog applier. // This gets updated only on batch boundaries. OpTimePair _lastAppliedOpTimesUpToLastBatch; // (M) - std::vector<OpTimePair> _opTimeMapping; // (M) // Pool of worker threads for writing ops to the databases. // Not owned by us. ThreadPool* const _writerPool; // (S) |