diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/apply_ops.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/apply_ops.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_external_state_mock.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/idempotency_test_fixture.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_test.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 62 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 94 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.h | 9 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 7 | ||||
-rw-r--r-- | src/mongo/shell/replsettest.js | 4 |
18 files changed, 199 insertions, 90 deletions
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp index 223568e52a0..54971bbdee8 100644 --- a/src/mongo/db/repl/apply_ops.cpp +++ b/src/mongo/db/repl/apply_ops.cpp @@ -486,13 +486,14 @@ Status applyOps(OperationContext* opCtx, // static MultiApplier::Operations ApplyOps::extractOperations(const OplogEntry& applyOpsOplogEntry) { MultiApplier::Operations result; - extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result); + extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result, boost::none); return result; } void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry, const BSONObj& topLevelDoc, - MultiApplier::Operations* operations) { + MultiApplier::Operations* operations, + boost::optional<Timestamp> commitOplogEntryTS) { uassert(ErrorCodes::TypeMismatch, str::stream() << "ApplyOps::extractOperations(): not a command: " << redact(applyOpsOplogEntry.toBSON()), @@ -513,8 +514,16 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry, for (const auto& elem : operationDocs) { auto operationDoc = elem.Obj(); BSONObjBuilder builder(operationDoc); + + // Apply the ts field first if we have a commitOplogEntryTS so that appendElementsUnique + // will not overwrite this value. + if (commitOplogEntryTS) { + builder.append("ts", *commitOplogEntryTS); + } + builder.appendElementsUnique(topLevelDoc); auto operation = builder.obj(); + operations->emplace_back(operation); } } diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h index 0672ce6d05f..c5cca31569f 100644 --- a/src/mongo/db/repl/apply_ops.h +++ b/src/mongo/db/repl/apply_ops.h @@ -56,10 +56,15 @@ public: * This variant allows optimization for extracting multiple applyOps operations. The entry for * the non-DurableReplOperation fields of the extracted operation must be specified as * 'topLevelDoc', and need not be any of the applyOps operations. + * + * If a commitOplogEntryTS Timestamp is passed in, then we are extracting applyOps operations + * from a prepare oplog entry during initial sync. These operations must be timestamped at the + * commit oplog entry timestamp instead of the prepareTimestamp. */ static void extractOperationsTo(const OplogEntry& applyOpsOplogEntry, const BSONObj& topLevelDoc, - MultiApplier::Operations* operations); + MultiApplier::Operations* operations, + boost::optional<Timestamp> commitOplogEntryTS); }; /** diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 322f91a29dc..cbee6637c67 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -56,7 +56,9 @@ public: private: void _run(OplogBuffer* oplogBuffer) final {} void _shutdown() final {} - StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final { + StatusWith<OpTime> _multiApply(OperationContext* opCtx, + Operations ops, + boost::optional<repl::OplogApplication::Mode> mode) final { return _externalState->multiApplyFn(opCtx, ops, _observer); } diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 6a5fe64ba4a..ac0a2510718 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -389,7 +389,8 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence std::vector<MultiApplier::OperationPtrs> writerVectors(1); std::vector<MultiApplier::Operations> derivedOps; // Derive ops for transactions if necessary. - syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps); + syncTail.fillWriterVectors( + _opCtx.get(), &ops, &writerVectors, &derivedOps, OplogApplication::Mode::kInitialSync); const auto& opPtrs = writerVectors[0]; ASSERT_OK(runOpPtrsInitialSync(opPtrs)); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 784a5cfba2c..2038989393e 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -1190,7 +1190,8 @@ void InitialSyncer::_getNextApplierBatchCallback( _fetchCount.store(0); MultiApplier::MultiApplyFn applyBatchOfOperationsFn = [this](OperationContext* opCtx, MultiApplier::Operations ops) { - return _oplogApplier->multiApply(opCtx, std::move(ops)); + return _oplogApplier->multiApply( + opCtx, std::move(ops), repl::OplogApplication::Mode::kInitialSync); }; OpTime lastApplied = ops.back().getOpTime(); invariant(ops.back().getWallClockTime()); diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 61b5d790bf5..24a484f1362 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -289,9 +289,11 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch( return std::move(ops); } -StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, Operations ops) { +StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, + Operations ops, + boost::optional<repl::OplogApplication::Mode> mode) { _observer->onBatchBegin(ops); - auto lastApplied = _multiApply(opCtx, std::move(ops)); + auto lastApplied = _multiApply(opCtx, std::move(ops), mode); _observer->onBatchEnd(lastApplied, {}); return lastApplied; } diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index 71ea92b4f3f..9650036b496 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -195,7 +195,9 @@ public: * * TODO: remove when enqueue() is implemented. */ - StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops); + StatusWith<OpTime> multiApply(OperationContext* opCtx, + Operations ops, + boost::optional<repl::OplogApplication::Mode> mode); private: /** @@ -221,7 +223,9 @@ private: * Called from multiApply() to apply a batch of operations in parallel. * Implemented in subclasses but not visible otherwise. */ - virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) = 0; + virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx, + Operations ops, + boost::optional<repl::OplogApplication::Mode> mode) = 0; // Used to schedule task for oplog application loop. // Not owned by us. diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 8c293a1bbba..9b3126cb781 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -61,8 +61,9 @@ void OplogApplierImpl::_shutdown() { _syncTail.shutdown(); } -StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) { - return _syncTail.multiApply(opCtx, std::move(ops)); +StatusWith<OpTime> OplogApplierImpl::_multiApply( + OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) { + return _syncTail.multiApply(opCtx, std::move(ops), mode); } } // namespace repl diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h index d6e0e523220..741cfd4dcd3 100644 --- a/src/mongo/db/repl/oplog_applier_impl.h +++ b/src/mongo/db/repl/oplog_applier_impl.h @@ -67,7 +67,9 @@ private: void _shutdown() override; - StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override; + StatusWith<OpTime> _multiApply(OperationContext* opCtx, + Operations ops, + boost::optional<repl::OplogApplication::Mode> mode) override; // Not owned by us. ReplicationCoordinator* const _replCoord; diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp index 352359bae58..05040963774 100644 --- a/src/mongo/db/repl/oplog_applier_test.cpp +++ b/src/mongo/db/repl/oplog_applier_test.cpp @@ -56,7 +56,9 @@ public: void _run(OplogBuffer* oplogBuffer) final; void _shutdown() final; - StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final; + StatusWith<OpTime> _multiApply(OperationContext* opCtx, + Operations ops, + boost::optional<repl::OplogApplication::Mode> mode) final; }; OplogApplierMock::OplogApplierMock(OplogBuffer* oplogBuffer) @@ -66,7 +68,8 @@ void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {} void OplogApplierMock::_shutdown() {} -StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) { +StatusWith<OpTime> OplogApplierMock::_multiApply( + OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) { return OpTime(); } diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 2e8890ddaa4..4f03e19b894 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -402,7 +402,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, OplogApplier::Operations batch; while ( !(batch = fassert(50763, oplogApplier.getNextApplierBatch(opCtx, batchLimits))).empty()) { - applyThroughOpTime = uassertStatusOK(oplogApplier.multiApply(opCtx, std::move(batch))); + applyThroughOpTime = uassertStatusOK( + oplogApplier.multiApply(opCtx, std::move(batch), OplogApplication::Mode::kRecovering)); } stats.complete(applyThroughOpTime); invariant(oplogBuffer.isEmpty(), diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 6b33953ca12..2c3e6317dc0 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -796,7 +796,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord, // Apply the operations in this batch. 'multiApply' returns the optime of the last op that // was applied, which should be the last optime in the batch. auto lastOpTimeAppliedInBatch = - fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch())); + fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch(), boost::none)); invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); // In order to provide resilience in the event of a crash in the middle of batch @@ -847,6 +847,12 @@ inline bool isCommitApplyOps(const OplogEntry& entry) { !entry.isPartialTransaction() && !entry.getObject().getBoolField("prepare"); } +// Returns whether a commitTransaction oplog entry is a part of a prepared transaction. +inline bool isPreparedCommit(const OplogEntry& entry) { + return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction; +} + + void SyncTail::shutdown() { stdx::lock_guard<stdx::mutex> lock(_mutex); _inShutdown = true; @@ -1109,7 +1115,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker) { + SessionUpdateTracker* sessionUpdateTracker, + boost::optional<repl::OplogApplication::Mode> mode) { const auto serviceContext = opCtx->getServiceContext(); const auto storageEngine = serviceContext->getStorageEngine(); @@ -1137,7 +1144,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, if (sessionUpdateTracker) { if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) { derivedOps->emplace_back(std::move(*newOplogWrites)); - _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + _fillWriterVectors( + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); } } @@ -1199,13 +1207,13 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // messing up the state of the opCtx. In particular we do not want to // set the ReadSource to kLastApplied. ReadSourceScope readSourceScope(opCtx); - derivedOps->emplace_back( - readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList)); + derivedOps->emplace_back(readTransactionOperationsFromOplogChain( + opCtx, op, partialTxnList, boost::none)); partialTxnList.clear(); } // Transaction entries cannot have different session updates. _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); } else { // The applyOps entry was not generated as part of a transaction. invariant(!op.getPrevWriteOpTimeInTransaction()); @@ -1213,7 +1221,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // Nested entries cannot have different session updates. _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); } } catch (...) { fassertFailedWithStatusNoTrace( @@ -1225,6 +1233,32 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, continue; } + // If we see a commitTransaction command that is a part of a prepared transaction during + // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers + // with the extracted operations. + if (isPreparedCommit(op) && (mode == OplogApplication::Mode::kInitialSync)) { + auto logicalSessionId = op.getSessionId(); + auto& partialTxnList = partialTxnOps[*logicalSessionId]; + + { + // Traverse the oplog chain with its own snapshot and read timestamp. + ReadSourceScope readSourceScope(opCtx); + + // Get the previous oplog entry, which should be a prepare oplog entry. + const auto prevOplogEntry = getPreviousOplogEntry(opCtx, op); + invariant(prevOplogEntry.shouldPrepare()); + + // Extract the operations from the applyOps entry. + auto commitOplogEntryOpTime = op.getOpTime(); + derivedOps->emplace_back(readTransactionOperationsFromOplogChain( + opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp())); + } + + _fillWriterVectors( + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); + continue; + } + auto& writer = (*writerVectors)[hash % numWriters]; if (writer.empty()) { writer.reserve(8); // Skip a few growth rounds @@ -1236,14 +1270,15 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, void SyncTail::fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps) { + std::vector<MultiApplier::Operations>* derivedOps, + boost::optional<repl::OplogApplication::Mode> mode) { SessionUpdateTracker sessionUpdateTracker; - _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); + _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker, mode); auto newOplogWrites = sessionUpdateTracker.flushAll(); if (!newOplogWrites.empty()) { derivedOps->emplace_back(std::move(newOplogWrites)); - _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); } } @@ -1275,10 +1310,13 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors } } -StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { +StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, + MultiApplier::Operations ops, + boost::optional<repl::OplogApplication::Mode> mode) { invariant(!ops.empty()); LOG(2) << "replication batch size is " << ops.size(); + // Stop all readers until we're done. This also prevents doc-locking engines from deleting old // entries from the oplog until we finish writing. Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); @@ -1318,7 +1356,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O std::vector<MultiApplier::Operations> derivedOps; std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); - fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); + fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps, mode); // Wait for writes to finish before applying ops. _writerPool->waitForIdle(); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index fd314fd3564..5f9961c8b22 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -228,12 +228,15 @@ public: * to at least the last optime of the batch. If 'minValid' is already greater than or equal * to the last optime of this batch, it will not be updated. */ - StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops); + StatusWith<OpTime> multiApply(OperationContext* opCtx, + MultiApplier::Operations ops, + boost::optional<repl::OplogApplication::Mode> mode); void fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps); + std::vector<MultiApplier::Operations>* derivedOps, + boost::optional<repl::OplogApplication::Mode> mode); private: class OpQueueBatcher; @@ -244,7 +247,8 @@ private: MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker); + SessionUpdateTracker* sessionUpdateTracker, + boost::optional<repl::OplogApplication::Mode> mode); /** * Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 514192587ba..9ec52ed34e3 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -408,7 +408,7 @@ DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty getStorageInterface(), noopApplyOperationFn, writerPool.get()); - syncTail.multiApply(_opCtx.get(), {}).getStatus().ignore(); + syncTail.multiApply(_opCtx.get(), {}, boost::none).getStatus().ignore(); } bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, @@ -434,7 +434,7 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx, SyncTail syncTail( nullptr, consistencyMarkers, storageInterface, applyOperationFn, writerPool.get()); - auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op})); + auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}, boost::none)); ASSERT_EQUALS(op.getOpTime(), lastOpTime); ASSERT_EQUALS(1U, operationsApplied.size()); @@ -598,7 +598,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { // being put in the oplog and updating the transaction table, but not actually being applied // because they are part of a pending transaction. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}, boost::none)); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -613,7 +613,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { // Apply a batch with only the second operation. This should result in the second oplog entry // being put in the oplog, but with no effect because the operation is part of a pending // transaction. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}, boost::none)); ASSERT_EQ(2U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -629,7 +629,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) { // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the two previous entries being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}, boost::none)); ASSERT_EQ(3U, oplogDocs().size()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -656,7 +656,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) { // Apply both inserts and the commit in a single batch. We expect no oplog entries to // be inserted (because we've set skipWritesToOplog), and both entries to be committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp})); + ASSERT_OK( + syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}, boost::none)); ASSERT_EQ(0U, oplogDocs().size()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -710,7 +711,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { // Insert the first entry in its own batch. This should result in the oplog entry being written // but the entry should not be applied as it is part of a pending transaction. const auto expectedStartOpTime = insertOps[0].getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}, boost::none)); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_EQ(0U, _insertedDocs[_nss1].size()); ASSERT_EQ(0U, _insertedDocs[_nss2].size()); @@ -723,8 +724,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) { // Insert the rest of the entries, including the commit. These entries should be added to the // oplog, and all the entries including the first should be applied. - ASSERT_OK( - syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp})); + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}, boost::none)); ASSERT_EQ(5U, oplogDocs().size()); ASSERT_EQ(3U, _insertedDocs[_nss1].size()); ASSERT_EQ(1U, _insertedDocs[_nss2].size()); @@ -847,7 +848,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) { // once. ASSERT_OK(syncTail.multiApply( _opCtx.get(), - {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2})); + {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2}, + boost::none)); ASSERT_EQ(6U, oplogDocs().size()); ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore); ASSERT_EQ(4U, _insertedDocs[_nss1].size()); @@ -956,7 +958,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea // being put in the oplog and updating the transaction table, but not actually being applied // because they are part of a pending transaction. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none)); ASSERT_EQ(2U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); @@ -972,7 +974,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea // Apply a batch with only the prepare. This should result in the prepare being put in the // oplog, and the two previous entries being applied (but in a transaction) along with the // nested insert in the prepare oplog entry. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none)); ASSERT_EQ(3U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); @@ -986,7 +988,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the three previous entries being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none)); ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -1006,7 +1008,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio // being put in the oplog and updating the transaction table, but not actually being applied // because they are part of a pending transaction. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none)); checkTxnTable(_lsid, _txnNum, _insertOp1->getOpTime(), @@ -1017,7 +1019,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio // Apply a batch with only the prepare. This should result in the prepare being put in the // oplog, and the two previous entries being applied (but in a transaction) along with the // nested insert in the prepare oplog entry. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none)); checkTxnTable(_lsid, _txnNum, _prepareWithPrevOp->getOpTime(), @@ -1027,7 +1029,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio // Apply a batch with only the abort. This should result in the abort being put in the // oplog and the transaction table being updated accordingly. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}, boost::none)); ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -1051,7 +1053,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit // being put in the oplog and updating the transaction table, but not actually being applied // because they are part of a pending transaction. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), {*_insertOp1, *_insertOp2}, OplogApplication::Mode::kInitialSync)); ASSERT_EQ(2U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]); ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]); @@ -1066,7 +1069,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, but, since this is initial sync, nothing else. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), {*_prepareWithPrevOp}, OplogApplication::Mode::kInitialSync)); ASSERT_EQ(3U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -1080,7 +1084,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the three previous entries being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), {*_commitPrepareWithPrevOp}, OplogApplication::Mode::kInitialSync)); ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -1115,7 +1120,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco // Apply a batch with the insert operations. This should have no effect, because this is // recovery. const auto expectedStartOpTime = _insertOp1->getOpTime(); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none)); ASSERT_TRUE(oplogDocs().empty()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1128,7 +1133,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco // Apply a batch with only the prepare applyOps. This should have no effect, since this is // recovery. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none)); ASSERT_TRUE(oplogDocs().empty()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1141,7 +1146,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco // Apply a batch with only the commit. This should result in the the three previous entries // being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none)); ASSERT_TRUE(oplogDocs().empty()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_EQ(2U, _insertedDocs[_nss2].size()); @@ -1161,7 +1166,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none)); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); @@ -1174,7 +1179,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and prepared insert being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none)); ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1202,7 +1207,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}, boost::none)); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -1215,7 +1220,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and prepared insert being committed. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none)); ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1234,7 +1239,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime(); // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, and the nested insert being applied (but in a transaction). - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none)); checkTxnTable(_lsid, _txnNum, _singlePrepareApplyOp->getOpTime(), @@ -1244,7 +1249,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep // Apply a batch with only the abort. This should result in the abort being put in the // oplog and the transaction table being updated accordingly. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}, boost::none)); ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1269,7 +1274,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the prepare applyOps. This should result in the prepare being put in // the oplog, but, since this is initial sync, nothing else. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), {*_singlePrepareApplyOp}, OplogApplication::Mode::kInitialSync)); ASSERT_EQ(1U, oplogDocs().size()); ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); @@ -1283,7 +1289,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the commit. This should result in the commit being put in the // oplog, and the previous entry being applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), {*_commitSinglePrepareApplyOp}, OplogApplication::Mode::kInitialSync)); ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1319,7 +1326,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the prepare applyOps. This should have no effect, since this is // recovery. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none)); ASSERT_TRUE(oplogDocs().empty()); ASSERT_TRUE(_insertedDocs[_nss1].empty()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -1332,7 +1339,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, // Apply a batch with only the commit. This should result in the previous entry being // applied. - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none)); ASSERT_TRUE(oplogDocs().empty()); ASSERT_EQ(1U, _insertedDocs[_nss1].size()); ASSERT_TRUE(_insertedDocs[_nss2].empty()); @@ -2551,7 +2558,7 @@ TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) { auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}, boost::none)); checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date); } @@ -2582,7 +2589,7 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) { auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}, boost::none)); ASSERT_FALSE(docExists( _opCtx.get(), @@ -2626,7 +2633,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTa auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}, boost::none)); checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date); } @@ -2658,7 +2665,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTa auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}, boost::none)); checkTxnTable(sessionInfo, newWriteOpTime, date); } @@ -2719,7 +2726,8 @@ TEST_F(SyncTailTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSessi SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp})); + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}, boost::none)); repl::checkTxnTable(_opCtx.get(), *sessionInfo.getSessionId(), @@ -2785,7 +2793,8 @@ TEST_F(SyncTailTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSessi SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp})); + ASSERT_OK(syncTail.multiApply( + _opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}, boost::none)); repl::checkTxnTable(_opCtx.get(), *sessionInfo.getSessionId(), @@ -2850,7 +2859,8 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) { nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); ASSERT_OK(syncTail.multiApply( _opCtx.get(), - {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn})); + {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn}, + boost::none)); // The txnNum and optime of the only write were saved. auto resultSingleDoc = @@ -2922,7 +2932,7 @@ TEST_F(SyncTailTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) { auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}, boost::none)); checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate); } @@ -2945,7 +2955,7 @@ TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) { auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}, boost::none)); ASSERT_FALSE(docExists(_opCtx.get(), NamespaceString::kSessionTransactionsTableNamespace, @@ -2970,7 +2980,7 @@ TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) { auto writerPool = OplogApplier::makeWriterPool(); SyncTail syncTail( nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get()); - ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog})); + ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}, boost::none)); ASSERT_FALSE(docExists( _opCtx.get(), diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index cf0a9d49c94..8f376b44d18 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -100,12 +100,8 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx, ReadSourceScope readSourceScope(opCtx); // Get the corresponding prepare applyOps oplog entry. - const auto prepareOpTime = entry.getPrevWriteOpTimeInTransaction(); - invariant(prepareOpTime); - TransactionHistoryIterator iter(prepareOpTime.get()); - invariant(iter.hasNext()); - const auto prepareOplogEntry = iter.nextFatalOnErrors(opCtx); - ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {}); + const auto prepareOplogEntry = getPreviousOplogEntry(opCtx, entry); + ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {}, boost::none); } const auto dbName = entry.getNss().db().toString(); @@ -137,6 +133,21 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx, } } // namespace +/** + * Helper used to get previous oplog entry from the same transaction. + */ +const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx, + const repl::OplogEntry& entry) { + const auto prevOpTime = entry.getPrevWriteOpTimeInTransaction(); + invariant(prevOpTime); + TransactionHistoryIterator iter(prevOpTime.get()); + invariant(iter.hasNext()); + const auto prevOplogEntry = iter.next(opCtx); + + return prevOplogEntry; +} + + Status applyCommitTransaction(OperationContext* opCtx, const OplogEntry& entry, repl::OplogApplication::Mode mode) { @@ -150,8 +161,7 @@ Status applyCommitTransaction(OperationContext* opCtx, auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject()); invariant(commitCommand.getCommitTimestamp()); - if (mode == repl::OplogApplication::Mode::kRecovering || - mode == repl::OplogApplication::Mode::kInitialSync) { + if (mode == repl::OplogApplication::Mode::kRecovering) { return _applyTransactionFromOplogChain(opCtx, entry, mode, @@ -216,7 +226,8 @@ Status applyAbortTransaction(OperationContext* opCtx, repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( OperationContext* opCtx, const OplogEntry& commitOrPrepare, - const std::vector<OplogEntry*>& cachedOps) { + const std::vector<OplogEntry*>& cachedOps, + boost::optional<Timestamp> commitOplogEntryTS) { repl::MultiApplier::Operations ops; // Get the previous oplog entry. @@ -245,7 +256,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( const auto& operationEntry = iter.nextFatalOnErrors(opCtx); invariant(operationEntry.isPartialTransaction()); auto prevOpsEnd = ops.size(); - repl::ApplyOps::extractOperationsTo(operationEntry, commitOrPrepareObj, &ops); + repl::ApplyOps::extractOperationsTo( + operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS); // Because BSONArrays do not have fast way of determining size without iterating through // them, and we also have no way of knowing how many oplog entries are in a transaction // without iterating, reversing each applyOps and then reversing the whole array is @@ -260,12 +272,14 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( for (auto* cachedOp : cachedOps) { const auto& operationEntry = *cachedOp; invariant(operationEntry.isPartialTransaction()); - repl::ApplyOps::extractOperationsTo(operationEntry, commitOrPrepareObj, &ops); + repl::ApplyOps::extractOperationsTo( + operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS); } // Reconstruct the operations from the commit or prepare oplog entry. if (commitOrPrepare.getCommandType() == OplogEntry::CommandType::kApplyOps) { - repl::ApplyOps::extractOperationsTo(commitOrPrepare, commitOrPrepareObj, &ops); + repl::ApplyOps::extractOperationsTo( + commitOrPrepare, commitOrPrepareObj, &ops, commitOplogEntryTS); } return ops; } @@ -284,7 +298,7 @@ Status _applyPrepareTransaction(OperationContext* opCtx, // The prepare time of the transaction is set explicitly below. auto ops = [&] { ReadSourceScope readSourceScope(opCtx); - return readTransactionOperationsFromOplogChain(opCtx, entry, {}); + return readTransactionOperationsFromOplogChain(opCtx, entry, {}, boost::none); }(); if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering || diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h index dc145d2c701..5ad20f48875 100644 --- a/src/mongo/db/repl/transaction_oplog_application.h +++ b/src/mongo/db/repl/transaction_oplog_application.h @@ -51,13 +51,20 @@ Status applyAbortTransaction(OperationContext* opCtx, repl::OplogApplication::Mode mode); /** + * Helper used to get previous oplog entry from the same transaction. + */ +const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx, + const repl::OplogEntry& entry); + +/** * Follow an oplog chain and copy the operations to destination. Operations will be copied in * forward oplog order (increasing optimes). */ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain( OperationContext* opCtx, const repl::OplogEntry& entry, - const std::vector<repl::OplogEntry*>& cachedOps); + const std::vector<repl::OplogEntry*>& cachedOps, + boost::optional<Timestamp> commitOplogEntryTS); /** * Apply `prepareTransaction` oplog entry. diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 10a7c57f407..6208fe8944d 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1386,7 +1386,8 @@ public: storageInterface, {}, writerPool.get()); - ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops))); + ASSERT_EQUALS(op2.getOpTime(), + unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none))); AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX); assertMultikeyPaths( @@ -1495,7 +1496,7 @@ public: storageInterface, options, writerPool.get()); - auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)); + auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none)); ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp()); // Wait for the index build to finish before making any assertions. @@ -2478,7 +2479,7 @@ public: auto writerPool = repl::OplogApplier::makeWriterPool(1); repl::SyncTail syncTail( nullptr, _consistencyMarkers, storageInterface, applyOperationFn, writerPool.get()); - auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp})); + auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}, boost::none)); ASSERT_EQ(insertOp.getOpTime(), lastOpTime); joinGuard.dismiss(); diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index 8d41cacaef4..a9d13d29672 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -2293,6 +2293,10 @@ var ReplSetTest = function(opts) { } baseOptions = Object.merge(baseOptions, this.nodeOptions["n" + n]); options = Object.merge(baseOptions, options); + if (options.hasOwnProperty("rsConfig")) { + this.nodeOptions["n" + n] = + Object.merge(this.nodeOptions["n" + n], {rsConfig: options.rsConfig}); + } delete options.rsConfig; options.restart = options.restart || restart; |