summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_oplog_applier.cpp
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2020-10-08 15:52:39 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-29 03:03:37 +0000
commit0ccc9275efc3e4c36850bd4cc297c90152b7a7e6 (patch)
treefb161c1b3289dd062c6b07434b1daab90af87bb8 /src/mongo/db/repl/tenant_oplog_applier.cpp
parentb4cd12f6dddc0d84ca1396176b39260c1777fba8 (diff)
downloadmongo-0ccc9275efc3e4c36850bd4cc297c90152b7a7e6.tar.gz
SERVER-51246 Write a noop into the oplog buffer after each batch to ensure tenant applier reaches stop timestamp
Diffstat (limited to 'src/mongo/db/repl/tenant_oplog_applier.cpp')
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp49
1 files changed, 40 insertions, 9 deletions
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 72c5294867f..157961b3749 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/repl/insert_group.h"
#include "mongo/db/repl/oplog_applier_utils.h"
#include "mongo/db/repl/tenant_migration_decoration.h"
+#include "mongo/db/repl/tenant_migration_recipient_service.h"
#include "mongo/db/repl/tenant_oplog_batcher.h"
#include "mongo/logv2/log.h"
#include "mongo/util/concurrency/thread_pool.h"
@@ -224,8 +225,7 @@ void TenantOplogApplier::_applyOplogBatch(TenantOplogBatch* batch) {
"Tenant Oplog Applier finished applying batch",
"tenant"_attr = _tenantId,
"migrationUuid"_attr = _migrationUuid,
- "lastDonorOptime"_attr = lastBatchCompletedOpTimes.donorOpTime,
- "lastRecipientOptime"_attr = lastBatchCompletedOpTimes.recipientOpTime);
+ "lastBatchCompletedOpTimes"_attr = lastBatchCompletedOpTimes);
// Notify all the waiters on optimes before and including _lastBatchCompletedOpTimes.
auto firstUnexpiredIter =
@@ -285,6 +285,21 @@ void TenantOplogApplier::_checkNsAndUuidsBelongToTenant(OperationContext* opCtx,
}
}
+namespace {
+bool isResumeTokenNoop(const OplogEntry& entry) {
+ if (entry.getOpType() != OpTypeEnum::kNoop) {
+ return false;
+ }
+ if (!entry.getObject().hasField("msg")) {
+ return false;
+ }
+ if (entry.getObject().getStringField("msg") != TenantMigrationRecipientService::kNoopMsg) {
+ return false;
+ }
+ return true;
+}
+} // namespace
+
TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
OperationContext* opCtx, const TenantOplogBatch& batch) {
auto* opObserver = cc().getServiceContext()->getOpObserver();
@@ -292,8 +307,18 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
WriteUnitOfWork wuow(opCtx);
// Reserve oplog slots for all entries. This allows us to write them in parallel.
auto oplogSlots = repl::getNextOpTimes(opCtx, batch.ops.size());
+ // Keep track of the greatest oplog slot actually used, ignoring resume token noops. This is
+ // what we want to return from this function.
+ auto greatestOplogSlotUsed = OpTime();
auto slotIter = oplogSlots.begin();
for (const auto& op : batch.ops) {
+ if (isResumeTokenNoop(op.entry)) {
+ // We do not want to set the recipient optime for resume token noop oplog entries since
+ // we won't actually apply them.
+ slotIter++;
+ continue;
+ }
+ greatestOplogSlotUsed = *slotIter;
_setRecipientOpTime(op.entry.getOpTime(), *slotIter++);
}
const size_t numOplogThreads = _writerPool->getStats().numThreads;
@@ -326,7 +351,7 @@ TenantOplogApplier::OpTimePair TenantOplogApplier::_writeNoOpEntries(
}
invariant(opsIter == batch.ops.end());
_writerPool->waitForIdle();
- return {batch.ops.back().entry.getOpTime(), oplogSlots.back()};
+ return {batch.ops.back().entry.getOpTime(), greatestOplogSlotUsed};
}
@@ -379,18 +404,24 @@ void TenantOplogApplier::_writeNoOpsForRange(OpObserver* opObserver,
WriteUnitOfWork wuow(opCtx.get());
auto slot = firstSlot;
for (auto iter = begin; iter != end; iter++, slot++) {
+ const auto& entry = iter->entry;
+ if (isResumeTokenNoop(entry)) {
+ // We don't want to write noops for resume token noop oplog entries. They would
+ // not be applied in a change stream anyways.
+ continue;
+ }
opObserver->onInternalOpMessage(
opCtx.get(),
- iter->entry.getNss(),
- iter->entry.getUuid(),
- iter->entry.toBSON(),
+ entry.getNss(),
+ entry.getUuid(),
+ entry.toBSON(),
BSONObj(),
// We link the no-ops together by recipient op time the same way the actual ops
// were linked together by donor op time. This is to allow retryable writes
// and changestreams to find the ops they need.
- _maybeGetRecipientOpTime(iter->entry.getPreImageOpTime()),
- _maybeGetRecipientOpTime(iter->entry.getPostImageOpTime()),
- _maybeGetRecipientOpTime(iter->entry.getPrevWriteOpTimeInTransaction()),
+ _maybeGetRecipientOpTime(entry.getPreImageOpTime()),
+ _maybeGetRecipientOpTime(entry.getPostImageOpTime()),
+ _maybeGetRecipientOpTime(entry.getPrevWriteOpTimeInTransaction()),
*slot);
}
});