diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2020-08-24 10:34:43 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-24 16:13:09 +0000 |
commit | f98014e47b4163021e4d6ce07f920703e4aede81 (patch) | |
tree | dcd1835f375a88efc8d72c7ecf7ad9c50ca63ad8 /src/mongo/db/repl/tenant_oplog_applier.cpp | |
parent | 43f5d658cded44a342a7e56db3ebe7ffd272abf1 (diff) | |
download | mongo-f98014e47b4163021e4d6ce07f920703e4aede81.tar.gz |
SERVER-48860 Make migration oplog applier class apply oplog entries.
Diffstat (limited to 'src/mongo/db/repl/tenant_oplog_applier.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_oplog_applier.cpp | 242 |
1 files changed, 215 insertions, 27 deletions
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index e3aae536a18..fbc4ab9355c 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -36,10 +36,15 @@ #include <algorithm> #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" #include "mongo/db/op_observer.h" #include "mongo/db/repl/apply_ops.h" +#include "mongo/db/repl/cloner_utils.h" +#include "mongo/db/repl/insert_group.h" +#include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/repl/tenant_migration_decoration.h" #include "mongo/db/repl/tenant_oplog_batcher.h" #include "mongo/logv2/log.h" @@ -68,25 +73,17 @@ TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid, : AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId), _migrationUuid(migrationUuid), _tenantId(tenantId), - _applyFromOpTime(applyFromOpTime), + _beginApplyingAfterOpTime(applyFromOpTime), _oplogBuffer(oplogBuffer), _executor(std::move(executor)), - _limits(kTenantApplierBatchSizeBytes, kTenantApplierBatchSizeOps) {} - -void TenantOplogApplier::_makeWriterPool_inlock(int threadCount) { - ThreadPool::Options options; - options.threadNamePrefix = "TenantOplogWriter-" + _tenantId + "-"; - options.poolName = "TenantOplogWriterThreadPool-" + _tenantId; - options.maxThreads = options.minThreads = static_cast<size_t>(threadCount); - options.onCreateThread = [migrationUuid = this->_migrationUuid](const std::string&) { - Client::initThread(getThreadName()); - AuthorizationSession::get(cc())->grantInternalAuthorization(&cc()); - }; - _writerPool = std::make_unique<ThreadPool>(options); - _writerPool->startup(); -} + _limits(kTenantApplierBatchSizeBytes, kTenantApplierBatchSizeOps), + _applierThreadCount(kTenantApplierThreadCount) {} -TenantOplogApplier::~TenantOplogApplier() {} + +TenantOplogApplier::~TenantOplogApplier() { + shutdown(); + join(); +} SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationForOpTime( OpTime donorOpTime) { @@ -107,7 +104,7 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo } Status TenantOplogApplier::_doStartup_inlock() noexcept { - _makeWriterPool_inlock(kTenantApplierThreadCount); + _writerPool = makeReplWriterPool(_applierThreadCount, "TenantOplogWriter"_sd); _oplogBatcher = std::make_unique<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor); auto status = _oplogBatcher->startup(); if (!status.isOK()) @@ -132,7 +129,7 @@ void TenantOplogApplier::_applyLoop(TenantOplogBatch batch) { // while the applier is processing the current one. auto nextBatchFuture = _oplogBatcher->getNextBatch(_limits); try { - _applyOplogBatch(batch); + _applyOplogBatch(&batch); } catch (const DBException& e) { _handleError(e.toStatus()); return; @@ -157,7 +154,7 @@ void TenantOplogApplier::_handleError(Status status) { "TenantOplogApplier::_handleError", "tenant"_attr = _tenantId, "migrationUuid"_attr = _migrationUuid, - "status"_attr = status); + "error"_attr = redact(status)); shutdown(); stdx::lock_guard lk(_mutex); // If we reach _handleError, it means the applyLoop is not running. @@ -176,15 +173,52 @@ void TenantOplogApplier::_finishShutdown(WithLock, Status status) { _transitionToComplete_inlock(); } -void TenantOplogApplier::_applyOplogBatch(const TenantOplogBatch& batch) { +void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) { LOGV2_DEBUG(4886004, 1, "Tenant Oplog Applier starting to apply batch", "tenant"_attr = _tenantId, "migrationUuid"_attr = _migrationUuid, - "firstDonorOptime"_attr = batch.ops.front().entry.getOpTime(), - "lastDonorOptime"_attr = batch.ops.back().entry.getOpTime()); - auto lastBatchCompletedOpTimes = _writeNoOpEntries(batch); + "firstDonorOptime"_attr = batch->ops.front().entry.getOpTime(), + "lastDonorOptime"_attr = batch->ops.back().entry.getOpTime()); + auto opCtx = cc().makeOperationContext(); + _checkNsAndUuidsBelongToTenant(opCtx.get(), *batch); + auto writerVectors = _fillWriterVectors(opCtx.get(), batch); + std::vector<Status> statusVector(writerVectors.size(), Status::OK()); + for (size_t i = 0; i < writerVectors.size(); i++) { + if (writerVectors[i].empty()) + continue; + + _writerPool->schedule([this, &writer = writerVectors.at(i), &status = statusVector.at(i)]( + auto scheduleStatus) { + if (!scheduleStatus.isOK()) { + status = scheduleStatus; + } else { + status = _applyOplogBatchPerWorker(&writer); + } + }); + } + _writerPool->waitForIdle(); + + // Make sure all the workers succeeded. + for (const auto& status : statusVector) { + if (!status.isOK()) { + LOGV2_ERROR(4886012, + "Failed to apply operation in tenant migration", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "error"_attr = redact(status)); + } + uassertStatusOK(status); + } + + + LOGV2_DEBUG(4886011, + 1, + "Tenant Oplog Applier starting to write no-ops", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid); + auto lastBatchCompletedOpTimes = _writeNoOpEntries(opCtx.get(), *batch); stdx::lock_guard lk(_mutex); _lastBatchCompletedOpTimes = lastBatchCompletedOpTimes; LOGV2_DEBUG(4886002, @@ -204,14 +238,62 @@ void TenantOplogApplier::_applyOplogBatch(const TenantOplogBatch& batch) { _opTimeNotificationList.erase(_opTimeNotificationList.begin(), firstUnexpiredIter); } +void TenantOplogApplier::_checkNsAndUuidsBelongToTenant(OperationContext* opCtx, + const TenantOplogBatch& batch) { + auto checkNsAndUuid = [&](const OplogEntry& op) { + if (!op.getNss().isEmpty() && !ClonerUtils::isNamespaceForTenant(op.getNss(), _tenantId)) { + LOGV2_ERROR(4886015, + "Namespace does not belong to tenant being migrated", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "nss"_attr = op.getNss()); + uasserted(4886016, "Namespace does not belong to tenant being migrated"); + } + if (!op.getUuid()) + return; + if (_knownGoodUuids.find(*op.getUuid()) != _knownGoodUuids.end()) + return; + try { + auto nss = OplogApplierUtils::parseUUIDOrNs(opCtx, op); + if (!ClonerUtils::isNamespaceForTenant(nss, _tenantId)) { + LOGV2_ERROR(4886013, + "UUID does not belong to tenant being migrated", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "UUID"_attr = *op.getUuid(), + "nss"_attr = nss.ns()); + uasserted(4886014, "UUID does not belong to tenant being migrated"); + } + _knownGoodUuids.insert(*op.getUuid()); + } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { + LOGV2_DEBUG(4886017, + 2, + "UUID for tenant being migrated does not exist", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "UUID"_attr = *op.getUuid(), + "nss"_attr = op.getNss().ns()); + } + }; + + for (const auto& op : batch.ops) { + if (op.expansionsEntry < 0 && !op.entry.isPartialTransaction()) + checkNsAndUuid(op.entry); + } + for (const auto& expansion : batch.expansions) { + for (const auto& op : expansion) { + checkNsAndUuid(op); + } + } +} + TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries( - const TenantOplogBatch& batch) { - auto opCtx = cc().makeOperationContext(); + OperationContext* opCtx, const TenantOplogBatch& batch) { auto* opObserver = cc().getServiceContext()->getOpObserver(); - WriteUnitOfWork wuow(opCtx.get()); + WriteUnitOfWork wuow(opCtx); // Reserve oplog slots for all entries. This allows us to write them in parallel. - auto oplogSlots = repl::getNextOpTimes(opCtx.get(), batch.ops.size()); + auto oplogSlots = repl::getNextOpTimes(opCtx, batch.ops.size()); auto slotIter = oplogSlots.begin(); for (const auto& op : batch.ops) { _setRecipientOpTime(op.entry.getOpTime(), *slotIter++); @@ -315,5 +397,111 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver, } }); } + +std::vector<std::vector<const OplogEntry*>> TenantOplogApplier::_fillWriterVectors( + OperationContext* opCtx, TenantOplogBatch* batch) { + std::vector<std::vector<const OplogEntry*>> writerVectors(_writerPool->getStats().numThreads); + CachedCollectionProperties collPropertiesCache; + + for (auto&& op : batch->ops) { + // If the operation's optime is before or the same as the beginApplyingAfterOpTime we don't + // want to apply it, so don't include it in writerVectors. + if (op.entry.getOpTime() <= _beginApplyingAfterOpTime) + continue; + uassert(4886006, + "Tenant oplog application does not support prepared transactions.", + !op.entry.shouldPrepare()); + uassert(4886007, + "Tenant oplog application does not support prepared transactions.", + !op.entry.isPreparedCommit()); + + // We never need to apply no-ops or partial transactions. + if (op.entry.getOpType() == OpTypeEnum::kNoop || op.entry.isPartialTransaction()) + continue; + + if (op.expansionsEntry >= 0) { + // This is an applyOps or transaction; add the expansions to the writer vectors. + OplogApplierUtils::addDerivedOps(opCtx, + &batch->expansions[op.expansionsEntry], + &writerVectors, + &collPropertiesCache, + false /* serial */); + } else { + // Add a single op to the writer vectors. + OplogApplierUtils::addToWriterVector( + opCtx, &op.entry, &writerVectors, &collPropertiesCache); + } + } + return writerVectors; +} + +Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts( + OperationContext* opCtx, + const OplogEntryOrGroupedInserts& entryOrGroupedInserts, + OplogApplication::Mode oplogApplicationMode) { + // We must ensure the opCtx uses replicated writes, because that will ensure we get a NotMaster + // error if a stepdown occurs. + invariant(opCtx->writesAreReplicated()); + + // Ensure context matches that of _applyOplogBatchPerWorker. + invariant(oplogApplicationMode == OplogApplication::Mode::kInitialSync); + + auto op = entryOrGroupedInserts.getOp(); + if (op.isIndexCommandType()) { + // TODO(SERVER-48862): Handle index builds during oplog application. + LOGV2_ERROR(488610, + "Index operations are not currently supported in tenant migration", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "op"_attr = redact(op.toBSON())); + + return Status::OK(); + } + // We don't count tenant application in the ops applied stats. + auto incrementOpsAppliedStats = [] {}; + // We always use oplog application mode 'kInitialSync', because we're applying oplog entries to + // a cloned database the way initial sync does. + auto status = OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon( + opCtx, + entryOrGroupedInserts, + OplogApplication::Mode::kInitialSync, + incrementOpsAppliedStats, + nullptr /* opCounters*/); + LOGV2_DEBUG(4886009, + 2, + "Applied tenant operation", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "error"_attr = status, + "op"_attr = redact(op.toBSON())); + return status; +} + +Status TenantOplogApplier::_applyOplogBatchPerWorker(std::vector<const OplogEntry*>* ops) { + auto opCtx = cc().makeOperationContext(); + tenantMigrationRecipientInfo(opCtx.get()) = + boost::make_optional<TenantMigrationRecipientInfo>(_migrationUuid); + + const bool allowNamespaceNotFoundErrorsOnCrudOps(true); + auto status = OplogApplierUtils::applyOplogBatchCommon( + opCtx.get(), + ops, + OplogApplication::Mode::kInitialSync, + allowNamespaceNotFoundErrorsOnCrudOps, + [this](OperationContext* opCtx, + const OplogEntryOrGroupedInserts& opOrInserts, + OplogApplication::Mode mode) { + return _applyOplogEntryOrGroupedInserts(opCtx, opOrInserts, mode); + }); + if (!status.isOK()) { + LOGV2_ERROR(4886008, + "Tenant migration writer worker batch application failed", + "tenant"_attr = _tenantId, + "migrationUuid"_attr = _migrationUuid, + "error"_attr = redact(status)); + } + return status; +} + } // namespace repl } // namespace mongo |