From 0ccc9275efc3e4c36850bd4cc297c90152b7a7e6 Mon Sep 17 00:00:00 2001 From: Judah Schvimer Date: Thu, 8 Oct 2020 15:52:39 +0000 Subject: SERVER-51246 Write a noop into the oplog buffer after each batch to ensure tenant applier reaches stop timestamp --- src/mongo/db/repl/tenant_oplog_applier.cpp | 49 ++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 9 deletions(-) (limited to 'src/mongo/db/repl/tenant_oplog_applier.cpp') diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 72c5294867f..157961b3749 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -46,6 +46,7 @@ #include "mongo/db/repl/insert_group.h" #include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/repl/tenant_migration_decoration.h" +#include "mongo/db/repl/tenant_migration_recipient_service.h" #include "mongo/db/repl/tenant_oplog_batcher.h" #include "mongo/logv2/log.h" #include "mongo/util/concurrency/thread_pool.h" @@ -224,8 +225,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) { "Tenant Oplog Applier finished applying batch", "tenant"_attr = _tenantId, "migrationUuid"_attr = _migrationUuid, - "lastDonorOptime"_attr = lastBatchCompletedOpTimes.donorOpTime, - "lastRecipientOptime"_attr = lastBatchCompletedOpTimes.recipientOpTime); + "lastBatchCompletedOpTimes"_attr = lastBatchCompletedOpTimes); // Notify all the waiters on optimes before and including _lastBatchCompletedOpTimes. auto firstUnexpiredIter = @@ -285,6 +285,21 @@ void TenantOplogApplier::_checkNsAndUuidsBelongToTenant(OperationContext* opCtx, } } +namespace { +bool isResumeTokenNoop(const OplogEntry& entry) { + if (entry.getOpType() != OpTypeEnum::kNoop) { + return false; + } + if (!entry.getObject().hasField("msg")) { + return false; + } + if (entry.getObject().getStringField("msg") != TenantMigrationRecipientService::kNoopMsg) { + return false; + } + return true; +} +} // namespace + TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( OperationContext* opCtx, const TenantOplogBatch& batch) { auto* opObserver = cc().getServiceContext()->getOpObserver(); @@ -292,8 +307,18 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( WriteUnitOfWork wuow(opCtx); // Reserve oplog slots for all entries. This allows us to write them in parallel. auto oplogSlots = repl::getNextOpTimes(opCtx, batch.ops.size()); + // Keep track of the greatest oplog slot actually used, ignoring resume token noops. This is + // what we want to return from this function. + auto greatestOplogSlotUsed = OpTime(); auto slotIter = oplogSlots.begin(); for (const auto& op : batch.ops) { + if (isResumeTokenNoop(op.entry)) { + // We do not want to set the recipient optime for resume token noop oplog entries since + // we won't actually apply them. + slotIter++; + continue; + } + greatestOplogSlotUsed = *slotIter; _setRecipientOpTime(op.entry.getOpTime(), *slotIter++); } const size_t numOplogThreads = _writerPool->getStats().numThreads; @@ -326,7 +351,7 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( } invariant(opsIter == batch.ops.end()); _writerPool->waitForIdle(); - return {batch.ops.back().entry.getOpTime(), oplogSlots.back()}; + return {batch.ops.back().entry.getOpTime(), greatestOplogSlotUsed}; } @@ -379,18 +404,24 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver, WriteUnitOfWork wuow(opCtx.get()); auto slot = firstSlot; for (auto iter = begin; iter != end; iter++, slot++) { + const auto& entry = iter->entry; + if (isResumeTokenNoop(entry)) { + // We don't want to write noops for resume token noop oplog entries. They would + // not be applied in a change stream anyways. + continue; + } opObserver->onInternalOpMessage( opCtx.get(), - iter->entry.getNss(), - iter->entry.getUuid(), - iter->entry.toBSON(), + entry.getNss(), + entry.getUuid(), + entry.toBSON(), BSONObj(), // We link the no-ops together by recipient op time the same way the actual ops // were linked together by donor op time. This is to allow retryable writes // and changestreams to find the ops they need. - _maybeGetRecipientOpTime(iter->entry.getPreImageOpTime()), - _maybeGetRecipientOpTime(iter->entry.getPostImageOpTime()), - _maybeGetRecipientOpTime(iter->entry.getPrevWriteOpTimeInTransaction()), + _maybeGetRecipientOpTime(entry.getPreImageOpTime()), + _maybeGetRecipientOpTime(entry.getPostImageOpTime()), + _maybeGetRecipientOpTime(entry.getPrevWriteOpTimeInTransaction()), *slot); } }); -- cgit v1.2.1