summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorLingzhi Deng <lingzhi.deng@mongodb.com>2021-03-31 14:24:33 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-05 15:07:12 +0000
commita5e7e1bf475025df6faf17709ba9e34b83c26e69 (patch)
tree04e96347184ba620433e9de075518fdf636c472a /src/mongo
parent3b0d71d80ebf5756502fc94ba086de30599b951d (diff)
downloadmongo-a5e7e1bf475025df6faf17709ba9e34b83c26e69.tar.gz
SERVER-55357: Start a new retryable write chain on each migration
(cherry picked from commit 03fbe539140630c6da4a29dfee160649d6453457)
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp2
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp57
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.h8
3 files changed, 46 insertions, 21 deletions
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 54b09f2f63a..9b253ba6f9a 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -2053,6 +2053,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
"migrationId"_attr = getMigrationUUID());
{
stdx::lock_guard lk(_mutex);
+ _tenantOplogApplier->setCloneFinishedRecipientOpTime(
+ *_stateDoc.getCloneFinishedRecipientOpTime());
uassertStatusOK(_tenantOplogApplier->startup());
_isRestartingOplogApplier = false;
_restartOplogApplierCondVar.notify_all();
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 5c0033802e5..3849180874e 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -110,6 +110,14 @@ Timestamp TenantOplogApplier::getResumeBatchingTs_forTest() const {
return _resumeBatchingTs;
}
+void TenantOplogApplier::setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime) {
+ stdx::lock_guard lk(_mutex);
+ invariant(!_isActive_inlock());
+ invariant(!cloneFinishedRecipientOpTime.isNull());
+ invariant(_cloneFinishedRecipientOpTime.isNull());
+ _cloneFinishedRecipientOpTime = cloneFinishedRecipientOpTime;
+}
+
Status TenantOplogApplier::_doStartup_inlock() noexcept {
_oplogBatcher =
std::make_shared<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor, _resumeBatchingTs);
@@ -605,19 +613,20 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
auto sessionId = *entry.getSessionId();
auto txnNumber = *entry.getTxnNumber();
auto entryStmtIds = entry.getStatementIds();
+ LOGV2_DEBUG(5351000,
+ 2,
+ "Tenant Oplog Applier processing retryable write",
+ "entry"_attr = redact(entry.toBSONForLogging()),
+ "sessionId"_attr = sessionId,
+ "txnNumber"_attr = txnNumber,
+ "statementIds"_attr = entryStmtIds,
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid);
if (entry.getOpType() == repl::OpTypeEnum::kNoop) {
// There are two types of no-ops we expect here. One is pre/post image, which
// will have an empty o2 field. The other is previously transformed oplog
// entries from earlier migrations.
- LOGV2_DEBUG(5351000,
- 2,
- "Tenant Oplog Applier processing retryable write no-op",
- "entry"_attr = redact(entry.toBSONForLogging()),
- "sessionId"_attr = sessionId,
- "txnNumber"_attr = txnNumber,
- "statementIds"_attr = entryStmtIds,
- "tenant"_attr = _tenantId,
- "migrationUuid"_attr = _migrationUuid);
+
// We don't wrap the no-ops in another no-op.
// If object2 is missing, this is a preImage/postImage.
if (!entry.getObject2()) {
@@ -651,16 +660,6 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
}
stmtIds.insert(stmtIds.end(), entryStmtIds.begin(), entryStmtIds.end());
- LOGV2_DEBUG(5350901,
- 2,
- "Tenant Oplog Applier processing retryable write",
- "sessionId"_attr = sessionId,
- "txnNumber"_attr = txnNumber,
- "statementIds"_attr = entryStmtIds,
- "tenant"_attr = _tenantId,
- "noop_entry"_attr = redact(noopEntry.toBSON()),
- "migrationUuid"_attr = _migrationUuid);
-
if (entry.getPreImageOpTime()) {
uassert(
5351005,
@@ -732,7 +731,18 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
<< txnNumber << " statement " << entryStmtIds.front()
<< " on session " << sessionId,
!txnParticipant.checkStatementExecutedNoOplogEntryFetch(entryStmtIds.front()));
- prevWriteOpTime = txnParticipant.getLastWriteOpTime();
+
+ // We could have an existing lastWriteOpTime for the same retryable write chain from a
+ // previously aborted migration. This could also happen if the tenant being migrated has
+ // previously resided in this replica set. So we want to start a new history chain
+ // instead of linking the newly generated no-op to the existing chain before the current
+ // migration starts. Otherwise, we could have duplicate entries for the same stmtId.
+ invariant(!_cloneFinishedRecipientOpTime.isNull());
+ if (txnParticipant.getLastWriteOpTime() > _cloneFinishedRecipientOpTime) {
+ prevWriteOpTime = txnParticipant.getLastWriteOpTime();
+ } else {
+ prevWriteOpTime = OpTime();
+ }
// Set sessionId, txnNumber, and statementId for all ops in a retryable write.
noopEntry.setSessionId(sessionId);
@@ -755,6 +765,13 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
noopEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime);
+ LOGV2_DEBUG(5535700,
+ 2,
+ "Tenant Oplog Applier writing session no-op",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "op"_attr = redact(noopEntry.toBSON()));
+
AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite);
writeConflictRetry(
opCtx.get(), "writeTenantNoOps", NamespaceString::kRsOplogNamespace.ns(), [&] {
diff --git a/src/mongo/db/repl/tenant_oplog_applier.h b/src/mongo/db/repl/tenant_oplog_applier.h
index 3da5212f979..8abfc3fca14 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.h
+++ b/src/mongo/db/repl/tenant_oplog_applier.h
@@ -95,6 +95,10 @@ public:
return _numOpsApplied;
}
+ /**
+ * This should only be called once before the applier starts.
+ */
+ void setCloneFinishedRecipientOpTime(OpTime cloneFinishedRecipientOpTime);
/**
* Returns the optime the applier will start applying from. Used for testing.
@@ -158,10 +162,12 @@ private:
const OpTime _beginApplyingAfterOpTime; // (R)
RandomAccessOplogBuffer* _oplogBuffer; // (R)
std::shared_ptr<executor::TaskExecutor> _executor; // (R)
+ // All no-op entries written by this tenant migration should have OpTime greater than this
+ // OpTime.
+ OpTime _cloneFinishedRecipientOpTime = OpTime(); // (R)
// Keeps track of last applied donor and recipient optimes by the tenant oplog applier.
// This gets updated only on batch boundaries.
OpTimePair _lastAppliedOpTimesUpToLastBatch; // (M)
- std::vector<OpTimePair> _opTimeMapping; // (M)
// Pool of worker threads for writing ops to the databases.
// Not owned by us.
ThreadPool* const _writerPool; // (S)