summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_oplog_applier.cpp
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@mongodb.com>2020-08-24 10:34:43 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-24 16:13:09 +0000
commitf98014e47b4163021e4d6ce07f920703e4aede81 (patch)
treedcd1835f375a88efc8d72c7ecf7ad9c50ca63ad8 /src/mongo/db/repl/tenant_oplog_applier.cpp
parent43f5d658cded44a342a7e56db3ebe7ffd272abf1 (diff)
downloadmongo-f98014e47b4163021e4d6ce07f920703e4aede81.tar.gz
SERVER-48860 Make migration oplog applier class apply oplog entries.
Diffstat (limited to 'src/mongo/db/repl/tenant_oplog_applier.cpp')
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp242
1 files changed, 215 insertions, 27 deletions
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index e3aae536a18..fbc4ab9355c 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -36,10 +36,15 @@
#include <algorithm>
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/repl/apply_ops.h"
+#include "mongo/db/repl/cloner_utils.h"
+#include "mongo/db/repl/insert_group.h"
+#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
#include "mongo/db/repl/tenant_oplog_batcher.h"
#include "mongo/logv2/log.h"
@@ -68,25 +73,17 @@ TenantOplogApplier::TenantOplogApplier(const UUID& migrationUuid,
: AbstractAsyncComponent(executor.get(), std::string("TenantOplogApplier_") + tenantId),
_migrationUuid(migrationUuid),
_tenantId(tenantId),
- _applyFromOpTime(applyFromOpTime),
+ _beginApplyingAfterOpTime(applyFromOpTime),
_oplogBuffer(oplogBuffer),
_executor(std::move(executor)),
- _limits(kTenantApplierBatchSizeBytes, kTenantApplierBatchSizeOps) {}
-
-void TenantOplogApplier::_makeWriterPool_inlock(int threadCount) {
- ThreadPool::Options options;
- options.threadNamePrefix = "TenantOplogWriter-" + _tenantId + "-";
- options.poolName = "TenantOplogWriterThreadPool-" + _tenantId;
- options.maxThreads = options.minThreads = static_cast<size_t>(threadCount);
- options.onCreateThread = [migrationUuid = this->_migrationUuid](const std::string&) {
- Client::initThread(getThreadName());
- AuthorizationSession::get(cc())->grantInternalAuthorization(&cc());
- };
- _writerPool = std::make_unique<ThreadPool>(options);
- _writerPool->startup();
-}
+ _limits(kTenantApplierBatchSizeBytes, kTenantApplierBatchSizeOps),
+ _applierThreadCount(kTenantApplierThreadCount) {}
-TenantOplogApplier::~TenantOplogApplier() {}
+
+TenantOplogApplier::~TenantOplogApplier() {
+ shutdown();
+ join();
+}
SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationForOpTime(
OpTime donorOpTime) {
@@ -107,7 +104,7 @@ SemiFuture<TenantOplogApplier::OpTimePair> TenantOplogApplier::getNotificationFo
}
Status TenantOplogApplier::_doStartup_inlock() noexcept {
- _makeWriterPool_inlock(kTenantApplierThreadCount);
+ _writerPool = makeReplWriterPool(_applierThreadCount, "TenantOplogWriter"_sd);
_oplogBatcher = std::make_unique<TenantOplogBatcher>(_tenantId, _oplogBuffer, _executor);
auto status = _oplogBatcher->startup();
if (!status.isOK())
@@ -132,7 +129,7 @@ void TenantOplogApplier::_applyLoop(TenantOplogBatch batch) {
// while the applier is processing the current one.
auto nextBatchFuture = _oplogBatcher->getNextBatch(_limits);
try {
- _applyOplogBatch(batch);
+ _applyOplogBatch(&batch);
} catch (const DBException& e) {
_handleError(e.toStatus());
return;
@@ -157,7 +154,7 @@ void TenantOplogApplier::_handleError(Status status) {
"TenantOplogApplier::_handleError",
"tenant"_attr = _tenantId,
"migrationUuid"_attr = _migrationUuid,
- "status"_attr = status);
+ "error"_attr = redact(status));
shutdown();
stdx::lock_guard lk(_mutex);
// If we reach _handleError, it means the applyLoop is not running.
@@ -176,15 +173,52 @@ void TenantOplogApplier::_finishShutdown(WithLock, Status status) {
_transitionToComplete_inlock();
}
-void TenantOplogApplier::_applyOplogBatch(const TenantOplogBatch& batch) {
+void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) {
LOGV2_DEBUG(4886004,
1,
"Tenant Oplog Applier starting to apply batch",
"tenant"_attr = _tenantId,
"migrationUuid"_attr = _migrationUuid,
- "firstDonorOptime"_attr = batch.ops.front().entry.getOpTime(),
- "lastDonorOptime"_attr = batch.ops.back().entry.getOpTime());
- auto lastBatchCompletedOpTimes = _writeNoOpEntries(batch);
+ "firstDonorOptime"_attr = batch->ops.front().entry.getOpTime(),
+ "lastDonorOptime"_attr = batch->ops.back().entry.getOpTime());
+ auto opCtx = cc().makeOperationContext();
+ _checkNsAndUuidsBelongToTenant(opCtx.get(), *batch);
+ auto writerVectors = _fillWriterVectors(opCtx.get(), batch);
+ std::vector<Status> statusVector(writerVectors.size(), Status::OK());
+ for (size_t i = 0; i < writerVectors.size(); i++) {
+ if (writerVectors[i].empty())
+ continue;
+
+ _writerPool->schedule([this, &writer = writerVectors.at(i), &status = statusVector.at(i)](
+ auto scheduleStatus) {
+ if (!scheduleStatus.isOK()) {
+ status = scheduleStatus;
+ } else {
+ status = _applyOplogBatchPerWorker(&writer);
+ }
+ });
+ }
+ _writerPool->waitForIdle();
+
+ // Make sure all the workers succeeded.
+ for (const auto& status : statusVector) {
+ if (!status.isOK()) {
+ LOGV2_ERROR(4886012,
+ "Failed to apply operation in tenant migration",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "error"_attr = redact(status));
+ }
+ uassertStatusOK(status);
+ }
+
+
+ LOGV2_DEBUG(4886011,
+ 1,
+ "Tenant Oplog Applier starting to write no-ops",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid);
+ auto lastBatchCompletedOpTimes = _writeNoOpEntries(opCtx.get(), *batch);
stdx::lock_guard lk(_mutex);
_lastBatchCompletedOpTimes = lastBatchCompletedOpTimes;
LOGV2_DEBUG(4886002,
@@ -204,14 +238,62 @@ void TenantOplogApplier::_applyOplogBatch(const TenantOplogBatch& batch) {
_opTimeNotificationList.erase(_opTimeNotificationList.begin(), firstUnexpiredIter);
}
+void TenantOplogApplier::_checkNsAndUuidsBelongToTenant(OperationContext* opCtx,
+ const TenantOplogBatch& batch) {
+ auto checkNsAndUuid = [&](const OplogEntry& op) {
+ if (!op.getNss().isEmpty() && !ClonerUtils::isNamespaceForTenant(op.getNss(), _tenantId)) {
+ LOGV2_ERROR(4886015,
+ "Namespace does not belong to tenant being migrated",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "nss"_attr = op.getNss());
+ uasserted(4886016, "Namespace does not belong to tenant being migrated");
+ }
+ if (!op.getUuid())
+ return;
+ if (_knownGoodUuids.find(*op.getUuid()) != _knownGoodUuids.end())
+ return;
+ try {
+ auto nss = OplogApplierUtils::parseUUIDOrNs(opCtx, op);
+ if (!ClonerUtils::isNamespaceForTenant(nss, _tenantId)) {
+ LOGV2_ERROR(4886013,
+ "UUID does not belong to tenant being migrated",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "UUID"_attr = *op.getUuid(),
+ "nss"_attr = nss.ns());
+ uasserted(4886014, "UUID does not belong to tenant being migrated");
+ }
+ _knownGoodUuids.insert(*op.getUuid());
+ } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
+ LOGV2_DEBUG(4886017,
+ 2,
+ "UUID for tenant being migrated does not exist",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "UUID"_attr = *op.getUuid(),
+ "nss"_attr = op.getNss().ns());
+ }
+ };
+
+ for (const auto& op : batch.ops) {
+ if (op.expansionsEntry < 0 && !op.entry.isPartialTransaction())
+ checkNsAndUuid(op.entry);
+ }
+ for (const auto& expansion : batch.expansions) {
+ for (const auto& op : expansion) {
+ checkNsAndUuid(op);
+ }
+ }
+}
+
TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
- const TenantOplogBatch& batch) {
- auto opCtx = cc().makeOperationContext();
+ OperationContext* opCtx, const TenantOplogBatch& batch) {
auto* opObserver = cc().getServiceContext()->getOpObserver();
- WriteUnitOfWork wuow(opCtx.get());
+ WriteUnitOfWork wuow(opCtx);
// Reserve oplog slots for all entries. This allows us to write them in parallel.
- auto oplogSlots = repl::getNextOpTimes(opCtx.get(), batch.ops.size());
+ auto oplogSlots = repl::getNextOpTimes(opCtx, batch.ops.size());
auto slotIter = oplogSlots.begin();
for (const auto& op : batch.ops) {
_setRecipientOpTime(op.entry.getOpTime(), *slotIter++);
@@ -315,5 +397,111 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver,
}
});
}
+
+std::vector<std::vector<const OplogEntry*>> TenantOplogApplier::_fillWriterVectors(
+ OperationContext* opCtx, TenantOplogBatch* batch) {
+ std::vector<std::vector<const OplogEntry*>> writerVectors(_writerPool->getStats().numThreads);
+ CachedCollectionProperties collPropertiesCache;
+
+ for (auto&& op : batch->ops) {
+ // If the operation's optime is before or the same as the beginApplyingAfterOpTime we don't
+ // want to apply it, so don't include it in writerVectors.
+ if (op.entry.getOpTime() <= _beginApplyingAfterOpTime)
+ continue;
+ uassert(4886006,
+ "Tenant oplog application does not support prepared transactions.",
+ !op.entry.shouldPrepare());
+ uassert(4886007,
+ "Tenant oplog application does not support prepared transactions.",
+ !op.entry.isPreparedCommit());
+
+ // We never need to apply no-ops or partial transactions.
+ if (op.entry.getOpType() == OpTypeEnum::kNoop || op.entry.isPartialTransaction())
+ continue;
+
+ if (op.expansionsEntry >= 0) {
+ // This is an applyOps or transaction; add the expansions to the writer vectors.
+ OplogApplierUtils::addDerivedOps(opCtx,
+ &batch->expansions[op.expansionsEntry],
+ &writerVectors,
+ &collPropertiesCache,
+ false /* serial */);
+ } else {
+ // Add a single op to the writer vectors.
+ OplogApplierUtils::addToWriterVector(
+ opCtx, &op.entry, &writerVectors, &collPropertiesCache);
+ }
+ }
+ return writerVectors;
+}
+
+Status TenantOplogApplier::_applyOplogEntryOrGroupedInserts(
+ OperationContext* opCtx,
+ const OplogEntryOrGroupedInserts& entryOrGroupedInserts,
+ OplogApplication::Mode oplogApplicationMode) {
+ // We must ensure the opCtx uses replicated writes, because that will ensure we get a NotMaster
+ // error if a stepdown occurs.
+ invariant(opCtx->writesAreReplicated());
+
+ // Ensure context matches that of _applyOplogBatchPerWorker.
+ invariant(oplogApplicationMode == OplogApplication::Mode::kInitialSync);
+
+ auto op = entryOrGroupedInserts.getOp();
+ if (op.isIndexCommandType()) {
+ // TODO(SERVER-48862): Handle index builds during oplog application.
+ LOGV2_ERROR(488610,
+ "Index operations are not currently supported in tenant migration",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "op"_attr = redact(op.toBSON()));
+
+ return Status::OK();
+ }
+ // We don't count tenant application in the ops applied stats.
+ auto incrementOpsAppliedStats = [] {};
+ // We always use oplog application mode 'kInitialSync', because we're applying oplog entries to
+ // a cloned database the way initial sync does.
+ auto status = OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(
+ opCtx,
+ entryOrGroupedInserts,
+ OplogApplication::Mode::kInitialSync,
+ incrementOpsAppliedStats,
+ nullptr /* opCounters*/);
+ LOGV2_DEBUG(4886009,
+ 2,
+ "Applied tenant operation",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "error"_attr = status,
+ "op"_attr = redact(op.toBSON()));
+ return status;
+}
+
+Status TenantOplogApplier::_applyOplogBatchPerWorker(std::vector<const OplogEntry*>* ops) {
+ auto opCtx = cc().makeOperationContext();
+ tenantMigrationRecipientInfo(opCtx.get()) =
+ boost::make_optional<TenantMigrationRecipientInfo>(_migrationUuid);
+
+ const bool allowNamespaceNotFoundErrorsOnCrudOps(true);
+ auto status = OplogApplierUtils::applyOplogBatchCommon(
+ opCtx.get(),
+ ops,
+ OplogApplication::Mode::kInitialSync,
+ allowNamespaceNotFoundErrorsOnCrudOps,
+ [this](OperationContext* opCtx,
+ const OplogEntryOrGroupedInserts& opOrInserts,
+ OplogApplication::Mode mode) {
+ return _applyOplogEntryOrGroupedInserts(opCtx, opOrInserts, mode);
+ });
+ if (!status.isOK()) {
+ LOGV2_ERROR(4886008,
+ "Tenant migration writer worker batch application failed",
+ "tenant"_attr = _tenantId,
+ "migrationUuid"_attr = _migrationUuid,
+ "error"_attr = redact(status));
+ }
+ return status;
+}
+
} // namespace repl
} // namespace mongo